1. 前言
为什么要使用 ConcurrentHashMap
主要基于两个原因:
- 在并发编程中使用 HashMap 可能造成死循环(jdk1.7,jdk1.8 中会造成数据丢失)
- HashTable 效率非常低下
2. ConcurrentHashMap 结构
jdk 1.7 和 jdk 1.8 中,ConcurrentHashMap 的结构有着很大的变化,我们依次来讲解。
2.1 jdk 1.7 中结构
在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 数据结构和 HashEntry 数组结构构成。采取分段锁来保证安全性。
Segment 是 ReentrantLock 重入锁,在 ConcurrentHashMap 中扮演锁的角色;HashEntry 则用于存储键值对数据。
一个 ConcurrentHashMap 里包含一个 Segment 数组,一个 Segment 里包含一个 HashEntry 数组,Segment 的结构和 HashMap 类似,是一个数组和链表结构。
2.2 jdk 1.8 中结构
JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。
3. 实现
3.1 JDK 1.7 中的实现
3.1.1 初始化
ConcurrentHashMap 的初始化是通过位运算 “与” 运算来初始化 Segment 的大小的(ssize 表示),通过concurrentLevel 计算得出。
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
ssize 用位于运算来计算(ssize <<=1),所以 Segment 的大小取值都是以2的N次方,Segment 的大小 ssize 默认为16.
每一个 Segment 元素下的 HashEntry 的初始化也是按照位于运算来计算,用 cap 来表示
int cap = 1;
while (cap < c)
cap <<= 1;
HashEntry 大小的计算也是2的N次方(cap <<=1), cap 的初始值为1,所以 HashEntry 最小的容量为2.
3.1.2 get 操作
Segment 的 get 操作实现非常简单和高效,先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。
public V get(Object key){
int hash = hash(key.hashCode());
return segmentFor(hash).get(key,hash);
}
get 操作的高效之处在于整个 get 过程都不需要加锁,除非读到空的值才会加锁重读。原因就是将使用的共享变量定义成 volatile
类型。
transient volatile int count;
volatile V value;
3.1.3 put 操作
对于 ConcurrentHashMap 的数据插入,这里要进行两次 Hash 去定位数据的存储位置
static class Segment<K,V> extends ReentrantLock implements Serializable {
//省略
}
当执行put操作时,会经历两个步骤:
- 判断是否需要扩容
- 定位到添加元素的位置,将其放入 HashEntry 数组中
插入过程会进行第一次 key 的 hash 来定位 Segment 的位置,如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值,然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承过来的锁的特性,在将数据插入指定的 HashEntry 位置时(尾插法),会通过继承 ReentrantLock 的 tryLock()
方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用 tryLock()
方法去获取锁,超过指定次数就挂起,等待唤醒。
3.1.4 size 操作
计算 ConcurrentHashMap 的元素大小是并发操作的,就是在你计算 size 的时候,他还在并发的插入数据,这就可能会导致你计算出来的 size 和你实际的 size 有相差。
ConcurrentHashMap 采取的解决方法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,统计过程中如果 count 发生变化,则再采用加锁的方式来统计所有 Segment 的大小。
try {
for (; ; ) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
/* 在put、remove、clean方法里操作
* 元素都会将变量modCount进行加一,
* 统计也是依靠这个变量的前后变化来进行的 */
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0) overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
3.2 JDK 1.8 中的实现
3.2.1 基本属性及概念
看一下基本属性:
//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
重要概念:
- table: 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
- nextTable: 默认为null,扩容时新生成的数组,其大小为原数组的两倍
- Node :保存 key,value 及 key 的 hash 值的数据结构。
class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//省略部分代码
}
其中 value 和 next 都用 volatile
修饰,保证并发的可见性。
- ForwardingNode: 一个特殊的 Node 节点,hash 值为 -1,其中存储 nextTable 的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
只有table发生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在table中表示当前节点为 null 或则已经被移动。
- TreeNode类和TreeBin类: TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。
3.2.2 初始化
实例化 ConcurrentHashMap 时带参数时,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方.
table 初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
3.2.3 put 操作
假设 table 已经初始化完成,put 操作采用 CAS + synchronized 实现并发插入或更新操作。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...省略部分代码
}
addCount(1L, binCount);
return null;
}
hash算法
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
table 中定位索引位置,n 是 table 的大小
int index = (n - 1) & hash
获取 table 中对应索引的元素f
Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
如果 f 为 null,说明 table 中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject 方法插入 Node 节点。
如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。
如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
如果f的 hash 值为 -1,说明当前 f 是 ForwardingNode 节点,意味有其它线程正在扩容,则一起进行扩容操作。
其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发,代码如下:
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f
判断,防止被其它线程修改。
如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。
如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。
如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。
table扩容
当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl
,需要对 table 进行扩容。
整个扩容分为两部分:
构建一个 nextTable,大小为 table 的两倍。
把 table 的数据复制到 nextTable 中。
这两个过程在单线程下实现很简单,但是 ConcurrentHashMap 是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。
先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行 nextTable 的初始化,具体实现如下:
private final void addCount(long x, int check) {
... 省略部分代码
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
通过 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保证只有一个线程能够初始化 nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的 1.5。
节点从 table 移动到 nextTable,大体思想是遍历、复制的过程。
首先根据运算得到需要遍历的次数i,然后利用 tabAt 方法获得 i 位置的元素 f,初始化一个 forwardNode 实例 fwd。
如果 f == null,则在 table 中的 i 位置放入 fwd,这个过程是采用 Unsafe.compareAndSwapObjectf 方法实现的,很巧妙的实现了节点的并发移动。
如果 f 是链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。
如果 f 是 TreeBin 节点,也做一个反序处理,并判断是否需要 untreeify,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,同样采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。
遍历过所有的节点以后就完成了复制工作,把 table 指向 nextTable,并更新 sizeCtl 为新数组大小的 0.75 倍 ,扩容完成。
红黑树构造
注意:如果链表结构中元素超过 TREEIFY_THRESHOLD 阈值,默认为 8
个,则把链表转化为红黑树,提高遍历查询效率。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
接下来我们看看如何构造树结构,代码如下:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证 table 中 index 位置元素是否被修改过。
- 根据 table 中 index 位置 Node 链表,重新生成一个 hd 为头结点的 TreeNode 链表。
- 根据 hd 头结点,生成 TreeBin 树结构,并把树结构的root节点写到 table 的 index 位置的内存中,具体实现如下:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
主要根据 Node 节点的 hash 值大小构建二叉树。
3.2.4 get 操作
get操作和put操作相比,显得简单了许多。
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 判断table是否为空,如果为空,直接返回null。
- 计算key的hash值,并获取指定table中指定位置的Node节点,通过遍历链表或则树结构找到对应的节点,返回value值。
3.2.4 size 操作
JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
具体参考:ConcurrentHashMap 的size方法原理分析
4. JDK 1.8 中为什么要摒弃分段锁
很多人不明白为什么Doug Lea在JDK1.8为什么要做这么大变动,使用重级锁synchronized,性能反而更高,原因如下:
- jdk1.8中锁的粒度更细了。jdk1.7中ConcurrentHashMap 的concurrentLevel(并发数)基本上是固定的。jdk1.8中的concurrentLevel是和数组大小保持一致的,每次扩容,并发度扩大一倍.
- 红黑树的引入,对链表的优化使得 hash 冲突时的 put 和 get 效率更高
- 获得JVM的支持 ,ReentrantLock 毕竟是 API 这个级别的,后续的性能优化空间很小。 synchronized 则是 JVM 直接支持的, JVM 能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得 synchronized 能够随着 JDK 版本的升级而不改动代码的前提下获得性能上的提升。
5. 小结&参考资料
小结
可以看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构已经接近 HashMap,相对而言,ConcurrentHashMap 只是增加了同步的操作来控制并发,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry
,到 JDK1.8 版本中synchronized+CAS+HashEntry+红黑树
,优化确实很大。
Comments | NOTHING