ConcurrentHashMap在我的面试生涯中,10次有8次是会被问到的,记得刚毕业那会,被问到ConcurrentHashMap源码的无助与苦涩,无奈只能网上找了一些教程,背一背,才算是蒙混过关,当然其实这种法子是不推崇的,而且不能真正理解源码真谛,遇到高手还是很容易就问出来的,那么我们这篇就一起来了解下ConcurrentHashMap如何成为一个理(meng)解(hun)源(guo)码(guan)的高手,建议对Map不是很了解的可以看看我前面写的一篇HashMap源码解析。其实网上找了很多资料,尤其是并发扩容这一块,没找到什么有用的资料,没办法,只能自己慢慢啃了蛮长时间的,所以这块也会详细写出来。
ConcurrentHashMap的数据结构和HashMap基本一致,这里就不重新画了,copy一下之前画的图,稍微改动一下:
那么问题来了,既然两个差不多,为啥又要搞个ConcurrentHashMap呢,这就要说到两者之间的区别到底在哪里:
既生HashMap何生ConcurrentHashMap:
HashMap和ConcurrentHashMap的区别说起来很多,但在我看来,最本质的区别就是HashMap是线程不安全的,而ConcurrentHashMap是线程安全的,这时候可能有人要说话了,那HashTable不也是线程安全的吗,如果只是为了线程安全,HashTable不就行了吗,还非要搞一个ConcurrentHashMap,看看jdk1.8这玩意代码都多少行了。
既生HashTable何生ConcurrentHashMap:
HashTable这里我就不赘述了,大体上就是HashMap加了个synchronized,区别有但不是很大,但ConcurrentHashMap从1.7的segment分段锁,到1.8的node节点锁,那代码复杂度那么高,doug lea图啥呢,这里我可以先给出个不标准答案: 性能,jdk代码风格死扣的话,你会发现这帮人就是变态,他可以多写十行你看不懂的代码,就为了提升几毫秒的,更别说ConcurrentHashMap这么多的了。
废话不多说,还是先看代码吧:
初始化不聊了,put时会先去初始化,我们直接看put方法:
put:
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //非空校验 int hash = spread(key.hashCode()); //再hash int binCount = 0; for (Node<K,V>[] tab = table;;) { //死循环 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //表示当前map为空,则需初始化操作 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //表示当前下标对应没有值 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //直接cas设值(其实和hashMap差不多),cas失败则下次循环继续 break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) //ForwardingNode节点会进入 tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { //看到没有,这里用的是synchronized,大神也觉得synchronized的性能至少不比ReentrantLock差吧 if (tabAt(tab, i) == f) { //这里校验是为了防止加锁前被其他线程修改了 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //节点存在,直接修改 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //节点不存在,加载链表尾部 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { //红黑树操作 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //节点数 if (binCount >= TREEIFY_THRESHOLD) // >=8,链表转红黑树 treeifyBin(tab, i); if (oldVal != null) //节点存在,直接返回旧值 return oldVal; break; } } } addCount(1L, binCount); return null; }
put直接调用了putVal方法,putVal方法在非空校验,再hash计算(为了更均匀分布)之后,开始自旋,做正事了,这里面写了几个if else :
1.判断map是否为空,为空初始化
2.定位到下标对应的节点,为空则说明当前节点链表未生成,new 一个cas塞进去
3.判断是否是ForwardingNode节点,是则需要协助扩容
4.都不是则正常塞值,判断是链表还是红黑树,走不同新增流程
接下来判断是否需要转红黑树,最后返回,节点存在则返回旧值,否则返回null,addCount方法我们后面会详细介绍,其实就是count加一,并判断是否需要扩容。
我们首先来看一下初始化的逻辑:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) //说明此时正在初始化,让出cpu资源 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //标识此时正在初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //未设值则赋默认值16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); //默认0.75倍扩容 } } finally { sizeCtl = sc; //扩容大小赋值 } break; } } return tab; }
其实看着注释对照代码还是很简单的,就是初始化一下node数组大小,及扩容因子等。
再看协助扩容:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //这里就是判断当前是否已经在扩容中 int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //sizectl+1 transfer(tab, nextTab); //扩容 break; } } return nextTab; } return table; }
并发扩容:
这里主要做了一个校验是否需要协助扩容,每多一个线程协助,sizectl+1,接下来主要看看扩容方法,这也是ConcurrentHashMap的点睛之笔:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating 说明是首个线程扩容 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //扩容两倍新数组 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { //这个for循环主要做并发扩容时,多线程迁移节点控制 Node<K,V> f; int fh; while (advance) { //这个自旋主要是用于控制未被分配处理的桶(链表)的个数 int nextIndex, nextBound; if (--i >= bound || finishing) //每次处理都减一,条件成立说明直接去扩容(只是控制下标)或已经结束,当前线程退出扩容 advance = false; else if ((nextIndex = transferIndex) <= 0) { //成立说明已经被分配完 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //cas修改下一个桶的位置,成功则更新下一个桶的位置 bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { //判断是否结束 int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //查不到cas查询,这个对性能的要求已经到令人发指的地步了吧 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // advance = true; // already processed else { synchronized (f) { //锁下面才是真正的节点迁移,可以发现这里锁的是一个node节点 if (tabAt(tab, i) == f) { //链表迁移 Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) //这里的计算和hashMap查不多,ln是低位节点 ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //这里cas分别在高低位链表中,并将当前旧tab更新为forward节点 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //红黑树迁移 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
1.首个线程扩容,new 2倍大小Node数组
2.自旋并发扩容:
这里代码写的比较复杂,我也是花了挺长时间才理清楚这块逻辑,我先解释一下并发扩容的机制:
首先一个线程过来,他是需要去看他要在哪个位置(会去读transferIndex的值)开始迁移,一个线程负责一块区域,多个线程这里采用的是类似跳表的机制,这里有一个扩容因子stride,最小为16,每次跳的大小就是这个扩容因子控制的。然后要去cas尝试是否可以占用这个节点(由于没有加锁,所以有可能其他线程先来占了),失败继续跳下一个节点,直到成功,每个线程负责的区域也就是下标为初始占用的地方(bound)到bound-stride的大小,如果当前线程把自己负责的区域扩容完,发现原本该下一个线程负责的节点没有被占用,则还是会继续往下扩容,否则就继续往下跳直到结束,后面的线程也是如此。
了解完这些我们再来看代码,首先看while (advance)里面:
1.首先判断--i>=bound,这个是用来控制数组下标的,每次循环-1,当大于时,说明当前线程还在自己负责的扩容范围,继续扩容,finishing表示扩容结束。
2.当(nextIndex = transferIndex) <= 0条件成立时,说明所有下标都有线程负责扩容了,新来的线程就不需要操心了,同时也是将当前需扩容下标赋值给nextIndex
3.cas修改下一个桶的位置(即下一个线程开始扩容的下标),成功则说明自己是第一个占领的,接下来扩容就好了,失败则表示已经被占了,则继续尝试占领下一个山头。
再看下一个if, if (i < 0 || i >= n || i + n >= nextn) 其实就是为了判断当前扩容是否结束,其实里面就是根据sizectl大小cas计算,然后标记finishing=true,下一轮则直接return.
再往下看:校验(f = tabAt(tab, i)) == null,这里如果下标为空,则cas设置一个forward节点占位
再往下看:else if ((fh = f.hash) == MOVED),这个表示当前节点是已扩容节点,置为true,回到开头的地方去循环
最后便是真正的节点迁移,这里看注释其实就差不多了,和hashMap单节点迁移是一样的,拆分高低位迁移。
其实这里最核心也最难理解的不是迁移,而是如何控制多线程去并发的迁移还能互不影响,这里使用了大批量的cas,只在最关键的地方使用了synchronized,可以说在性能方面做到了极致。
其实理解了上面这些,再往下看就简单了很多,协助扩容说完了,我们再看后面其实就是走下一轮循环链表新增节点或红黑树新增节点。接下来put方法还有一个核心就是addCount,我们先看代码:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //这个if就是更新baseCount值,可以不关注 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { //主要看这里 Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //大小超出扩容参数 && 数组不为空 && 长度未超出最大值,表示需要扩容 int rs = resizeStamp(n); if (sc < 0) { //表示已经有线程在扩容/初始化 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) //表示其他线程正在初始化 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 协助扩容 transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //表示之前无其他线程扩容、初始化 transfer(tab, null); //直接扩容 s = sumCount(); } } }
你会发现除了将baseCount+1,就是判断是否还要扩容,协助扩容,又联系到上面去了。
put方法大体上说完了,哇,也太复杂了,那么接下来再看看get方法是啥样的呢:
get:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //首先判断hash计算当前下标下存在值 if ((eh = e.hash) == h) { //这里首先看第一个节点是否就是要找的值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //表示正在扩容或红黑树 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { //遍历查询 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
当hash小于0时,有可能为正在扩容(-1)或者红黑树(-2),这里主要介绍一下正在扩容时的查询,直接看ForwardingNode.find:
Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { //首先将nextTable赋值给tab Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) 说明当前节点为空 return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) //首节点等于要查询key return e; if (eh < 0) { if (e instanceof ForwardingNode) { //这种情况暂时还没想到有什么情况会遇到 tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else //表示正在扩容的节点为红黑树 return e.find(h, k); } if ((e = e.next) == null) //查不到返回null return null; } } }
相比上面扩容,这些代码确实很简单了,全程没用锁,但也保证了并发及扩容时可以查到,这时候就有一个问题来了,那我如何能保证这些数据我都能实时查询到呢,难道就不会出现数据在cpu缓存没刷出来导致我查询时有问题吗,当然,doug lea大神肯定是会考虑到这些的,他用简简单单的一个volatile关键字完成了这一切,volatile关键字保证了数据的可见性,也就保证了我前面问题的并发安全,后续我也会补一章volatile的文章。
其实这里我曾经思考过一个问题,当正在扩容时,我去新的节点获取,不是会出现有的节点没有迁移完成而导致原有该查到的节点但却查不到吗,后来我又把迁移的代码看了一下:
for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd);
这段代码中,for循环会把所有的高节点和低位节点迁移完成,然后cas设入新node数组,最后才会标记ForwardNode节点,这样就保证ForwardNode节点对应的新迁移的两个节点都是已经迁移完成的。
总结:
源码部分算是写完了,这时候我可以回答最开始的那个问题了,既生HashTable何生ConcurrentHashMap? =》 HashTable的性能属实太差了,我想put一个节点,就导致我所有的节点全程都不能使用,而chm不一样,不仅使用node节点来做分段锁(1.8),而且除了在实际数据新增或者迁移时使用了synchronized关键字,其他都是使用乐观锁cas来操作,极大地提升了高并发情况map存取的性能,同时也保证了线程安全。可以这么说吧,绝大部分情况下需要用到并发安全的map就选ConcurrentHashMap吧,doug lea把能为你做的都做好了!!!