zoukankan      html  css  js  c++  java
  • ConcurrentHashMap

    ConcurrentHashMap实现原理

    众所周知,哈希表是中非常高效,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMapHashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

    HashMap:先说HashMapHashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成死循环,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。

    HashTable HashTableHashMap的实现原理几乎一样,差别无非是
    1.HashTable不允许keyvaluenull
    2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。


    HashTable性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的"分段锁"思想。

    ConcurrentHashMap源码分析  

    ConcurrentHashMap采用了非常精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。

    final Segment<K,V>[] segments;

    Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve16来讲,理论上就允许16个线程并发执行,有木有很酷)

    所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。

    Segment类似于HashMap,一个Segment维护着一个HashEntry数组

    transient volatile HashEntry<K,V>[] table;

    HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。

    static final class HashEntry<K,V> {

            final int hash;

            final K key;

            volatile V value;

            volatile HashEntry<K,V> next;

            //其他省略

    }    

    我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法


    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {

                this.loadFactor = lf;//负载因子

                this.threshold = threshold;//阈值

                this.table = tab;//主干数组即HashEntry数组

            }

    我们来看下ConcurrentHashMap的构造方法

    public ConcurrentHashMap(int initialCapacity,

                              float loadFactor, int concurrencyLevel) {

         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

             throw new IllegalArgumentException();

         //MAX_SEGMENTS 1<<16=65536,也就是最大并发数为65536

         if (concurrencyLevel > MAX_SEGMENTS)

             concurrencyLevel = MAX_SEGMENTS;

         //2sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5

        int sshift = 0;

        //ssize segments数组长度,根据concurrentLevel计算得出

        int ssize = 1;

        while (ssize < concurrencyLevel) {

            ++sshift;

            ssize <<= 1;

        }

        //segmentShiftsegmentMask这两个变量在定位segment时会用到,后面会详细讲

        this.segmentShift = 32 - sshift;

        this.segmentMask = ssize - 1;

        if (initialCapacity > MAXIMUM_CAPACITY)

            initialCapacity = MAXIMUM_CAPACITY;

        //计算cap的大小,即SegmentHashEntry的数组长度,cap也一定为2n次方.

        int c = initialCapacity / ssize;

        if (c * ssize < initialCapacity)

            ++c;

        int cap = MIN_SEGMENT_TABLE_CAPACITY;

        while (cap < c)

            cap <<= 1;

        //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化

        Segment<K,V> s0 =

            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),

                             (HashEntry<K,V>[])new HashEntry[cap]);

        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

        UNSAFE.putOrderedObject(ss, SBASE, s0);

        this.segments = ss;

    }

    初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity16loadFactor0.75(负载因子,扩容时需要参考),concurrentLevel16

    从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevelssize一定是大于或等于concurrentLevel的最小的2的次幂。比如:默认情况下concurrentLevel16,则ssize16;若concurrentLevel14ssize16;若concurrentLevel17,则ssize32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segmentindex。至于更详细的原因,有兴趣的话可以参考我的另一篇文章《HashMap实现原理及源码分析》,其中对于数组长度为什么一定要是2的次幂有较为详细的分析。

    接下来,我们来看看put方法

     public V put(K key, V value) {

            Segment<K,V> s;

            //concurrentHashMap不允许key/value为空

            if (value == null)

                throw new NullPointerException();

            //hash函数对keyhashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀

            int hash = hash(key);

            //返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment

            int j = (hash >>> segmentShift) & segmentMask;

            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck

                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment

                s = ensureSegment(j);

            return s.put(key, hash, value, false);

        }

    从源码看出,put的主要逻辑也就两步:1.定位segment并确保定位的Segment已初始化 2.调用Segmentput方法。

    关于segmentShiftsegmentMask

      segmentShiftsegmentMask这两个全局变量的主要作用是用来定位Segmentint j =(hash >>> segmentShift) & segmentMask

      segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性

      segmentShift2sshift次方等于ssizesegmentShift=32-sshift。若segments长度为16segmentShift=32-4=28;segments长度为32segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment


    get/put方法

    get方法

     public V get(Object key) {

            Segment<K,V> s;

            HashEntry<K,V>[] tab;

            int h = hash(key);

            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

            //先定位Segment,再定位HashEntry

            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&

                (tab = s.table) != null) {

                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile

                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

                     e != null; e = e.next) {

                    K k;

                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))

                        return e.value;

                }

            }

            return null;

        }

    get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

    来看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加锁的。只不过是锁粒度细了而已。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {

                HashEntry<K,V> node = tryLock() ? null :

                    scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntrytryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。

                V oldValue;

                try {

                    HashEntry<K,V>[] tab = table;

                    int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。

                    HashEntry<K,V> first = entryAt(tab, index);

                    for (HashEntry<K,V> e = first;;) {

                        if (e != null) {

                            K k;

                            if ((k = e.key) == key ||

                                (e.hash == hash && key.equals(k))) {

                                oldValue = e.value;

                                if (!onlyIfAbsent) {

                                    e.value = value;

                                    ++modCount;

                                }

                                break;

                            }

                            e = e.next;

                        }

                        else {

                            if (node != null)

                                node.setNext(first);

                            else

                                node = new HashEntry<K,V>(hash, key, value, first);

                            int c = count + 1;

    //c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。

                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)

                                rehash(node);

                            else

                                setEntryAt(tab, index, node);

                            ++modCount;

                            count = c;

                            oldValue = null;

                            break;

                        }

                    }

                } finally {

                    unlock();

                }

                return oldValue;

            }

    总结

    ConcurrentHashMap作为一种线程安全且高效的哈希表的解决方案,尤其其中的"分段锁"的方案,相比HashTable的全表锁在性能上的提升非常之大。本文对ConcurrentHashMap的实现原理进行了详细分析,并解读了部分源码,希望能帮助到有需要的童鞋。

    https://www.cnblogs.com/chengxiao/p/6842045.html

    并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap()ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点好像CAP理论啊 O(_)O)。ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatilefinalCASlock-free技术来减少锁竞争对于性能的影响,无论对于Java并发编程的学习还是Java内存模型的理解,ConcurrentHashMap的设计以及源码都值得非常仔细的阅读与揣摩。

    这篇日志记录了自己对ConcurrentHashMap的一些总结,由于JDK678中实现都不同,需要分开阐述在不同版本中的ConcurrentHashMap

    之前已经在ConcurrentHashMap原理分析中解释了ConcurrentHashMap的原理,主要是从代码的角度来阐述是源码是如何写的,本文仍然从源码出发,挑选个人觉得重要的点(会用红色标注)再次进行回顾,以及阐述ConcurrentHashMap的一些注意点。

    1. JDK6JDK7中的实现

    1.1 设计思路

    ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的,具体请查看ConcurrentHashMap能完全替代HashTable吗?)。

    ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMapJDK7JDK8HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLockSegment继承了ReentrantLock)。ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性,代码如下:

    static final class HashEntry<K,V> {

            final int hash;

            final K key;

            volatile V value;

            volatile HashEntry<K,V> next;

    1.2 并发度(Concurrency Level

    并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。运行时通过将key的高n位(n = 32 segmentShift)和并发度减1segmentMask)做位与运算定位到所在的SegmentsegmentShiftsegmentMask都是在构造过程中根据concurrency level被相应的计算出来。

    如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。(文档的说法是根据你并发的线程数量决定,太多会导性能降低)

    1.3 创建分段锁

    JDK6不同,JDK7中除了第一个Segment之外,剩余的Segments采用的是延迟初始化的机制:每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。

    ensureSegment可能在并发环境下被调用,但与想象中不同,ensureSegment并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下:

    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

                    == null) { // recheck

                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);

                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

                           == null) {

                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))

                            break;

                    }

    }

    1.4 put/putIfAbsent/putAll

    JDK6一样,ConcurrentHashMapput方法被代理到了对应的Segment(定位Segment的原理之前已经描述过)中。与JDK6不同的是,JDK7版本的ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化 - 在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。

    需要注意的是,由于在并发环境下,其他线程的putrehash或者remove操作可能会导致链表头结点的变化,因此在过程中需要进行检查,如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除,即使该变化因为是非原子写操作(删除节点后链接后续节点调用的是Unsafe.putOrderedObject(),该方法不提供原子写语义)可能导致当前线程无法观察到,但因为不影响遍历的正确性所以忽略不计。

    之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。

    在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntryvalue值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。

    put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过UnsafeputOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。

    1.5 rehash

    相对于HashMapresizeConcurrentHashMaprehash原理类似,但是Doug Learehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的indexi,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。这部分代码如下:

    private void rehash(HashEntry<K,V> node) {

               HashEntry<K,V>[] oldTable = table;

               int oldCapacity = oldTable.length;

               int newCapacity = oldCapacity << 1;

               threshold = (int)(newCapacity * loadFactor);

               HashEntry<K,V>[] newTable =

                   (HashEntry<K,V>[]) new HashEntry[newCapacity];

               int sizeMask = newCapacity - 1;

               for (int i = 0; i < oldCapacity ; i++) {

                   HashEntry<K,V> e = oldTable[i];

                   if (e != null) {

                       HashEntry<K,V> next = e.next;

                       int idx = e.hash & sizeMask;

                       if (next == null)   //  Single node on list

                           newTable[idx] = e;

                       else { // Reuse consecutive sequence at same slot

                           HashEntry<K,V> lastRun = e;

                           int lastIdx = idx;

                           for (HashEntry<K,V> last = next;

                                last != null;

                                last = last.next) {

                               int k = last.hash & sizeMask;

                               if (k != lastIdx) {

                                   lastIdx = k;

                                   lastRun = last;

                               }

                           }

                           newTable[lastIdx] = lastRun;

                           // Clone remaining nodes

                           for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {

                               V v = p.value;

                               int h = p.hash;

                               int k = h & sizeMask;

                               HashEntry<K,V> n = newTable[k];

                               newTable[k] = new HashEntry<K,V>(h, p.key, v, n);

                           }

                       }

                   }

               }

               int nodeIndex = node.hash & sizeMask; // add the new node

               node.setNext(newTable[nodeIndex]);

               newTable[nodeIndex] = node;

               table = newTable;

           }

    1.6 remove

    put类似,remove在真正获得锁之前,也会对链表进行遍历以提高缓存命中率。

    1.7 getcontainsKey

    getcontainsKey两个方法几乎完全一致:他们都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此getcontainsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。

    1.8 sizecontainsValue

    这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment(通过UnsafegetObjectVolatile()以保证原子读语义),获得对应的值以及所有Segmentmodcount之和。如果连续两次所有Segmentmodcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。

    当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的Segment,否则容易出现其他线程创建Segment并进行putremove等操作。代码如下:

    for(int j =0; j < segments.length; ++j)

    ensureSegment(j).lock();// force creation

    一般来说,应该避免在多线程环境下使用sizecontainsValue方法。

    1modcountput, replace, remove以及clear等方法中都会被修改。

    2:对于containsValue方法来说,如果在循环过程中发现匹配valueHashEntry,则直接返回true

    最后,与HashMap不同的是,ConcurrentHashMap并不允许key或者valuenull,按照Doug Lea的说法,这么设计的原因是在ConcurrentHashMap中,一旦value出现null,则代表HashEntrykey/value没有映射完成就被其他线程所见,需要特殊处理。在JDK6中,get方法的实现中就有一段对HashEntry.value == null的防御性判断。但Doug Lea也承认实际运行过程中,这种情况似乎不可能发生(参考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。

    2. JDK8中的实现

    ConcurrentHashMapJDK8中进行了巨大改动,很需要通过源码来再次学习下Doug Lea的实现方法。

    它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想(JDK7JDK8HashMap的实现),但是为了做到并发,又增加了很多辅助的类,例如TreeBinTraverser等对象内部类。

    2.1 重要的属性

    首先来看几个重要的属性,与HashMap相同的就不再介绍了,这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。

    负数代表正在进行初始化或扩容操作

    -1代表正在初始化

    -N 表示有N-1个线程正在进行扩容操作

    正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。

    /**

         * 盛装Node元素的数组 它的大小是2的整数次幂

         * Size is always a power of two. Accessed directly by iterators.

         */

        transient volatile Node<K,V>[] table;

            /**

         * Table initialization and resizing control.  When negative, the

         * table is being initialized or resized: -1 for initialization,

         * else -(1 + the number of active resizing threads).  Otherwise,

         * when table is null, holds the initial table size to use upon

         * creation, or 0 for default. After initialization, holds the

         * next element count value upon which to resize the table.

         hash表初始化或扩容时的一个控制位标识量。

         负数代表正在进行初始化或扩容操作

         -1代表正在初始化

         -N 表示有N-1个线程正在进行扩容操作

         正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

         */

        private transient volatile int sizeCtl;

        // 以下两个是用来控制扩容的时候 单线程进入的变量

         /**

         * The number of bits used for generation stamp in sizeCtl.

         * Must be at least 6 for 32bit arrays.

         */

        private static int RESIZE_STAMP_BITS = 16;

            /**

         * The bit shift for recording size stamp in sizeCtl.

         */

        private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

        /*

         * Encodings for Node hash fields. See above for explanation.

         */

        static final int MOVED     = -1; // hash值是-1,表示这是一个forwardNode节点

        static final int TREEBIN   = -2; // hash值是-2  表示这时一个TreeBin节点

    2.2 重要的类

    2.2.1 Node

    Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对valuenext属性设置了volatile同步锁(JDK7Segment相同),它不允许调用setValue方法直接改变Nodevalue域,它增加了find方法辅助map.get()方法。

    2.2.2 TreeNode

    树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNodeConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

    2.2.3 TreeBin

    这个类并不负责包装用户的keyvalue信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

    这里仅贴出它的构造方法。可以看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识为。同时也看到我们熟悉的红黑树构造方法

    2.2.4 ForwardingNode

    一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。

    /**

         * A node inserted at head of bins during transfer operations.

         */

        static final class ForwardingNode<K,V> extends Node<K,V> {

            final Node<K,V>[] nextTable;

            ForwardingNode(Node<K,V>[] tab) {

                super(MOVED, null, null, null);

                this.nextTable = tab;

            }

            Node<K,V> find(int h, Object k) {

                // loop to avoid arbitrarily deep recursion on forwarding nodes

                outer: for (Node<K,V>[] tab = nextTable;;) {

                    Node<K,V> e; int n;

                    if (k == null || tab == null || (n = tab.length) == 0 ||

                        (e = tabAt(tab, (n - 1) & h)) == null)

                        return null;

                    for (;;) {

                        int eh; K ek;

                        if ((eh = e.hash) == h &&

                            ((ek = e.key) == k || (ek != null && k.equals(ek))))

                            return e;

                        if (eh < 0) {

                            if (e instanceof ForwardingNode) {

                                tab = ((ForwardingNode<K,V>)e).nextTable;

                                continue outer;

                            }

                            else

                                return e.find(h, k);

                        }

                        if ((e = e.next) == null)

                            return null;

                    }

                }

            }

        }

    2.3 UnsafeCAS

    ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。

    2.3.1 unsafe静态块

    unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。利用CAS进行无锁操作,可以大大提高性能。

    private static final sun.misc.Unsafe U;

       private static final long SIZECTL;

       private static final long TRANSFERINDEX;

       private static final long BASECOUNT;

       private static final long CELLSBUSY;

       private static final long CELLVALUE;

       private static final long ABASE;

       private static final int ASHIFT;

       static {

           try {

               U = sun.misc.Unsafe.getUnsafe();

               Class<?> k = ConcurrentHashMap.class;

               SIZECTL = U.objectFieldOffset

                   (k.getDeclaredField("sizeCtl"));

               TRANSFERINDEX = U.objectFieldOffset

                   (k.getDeclaredField("transferIndex"));

               BASECOUNT = U.objectFieldOffset

                   (k.getDeclaredField("baseCount"));

               CELLSBUSY = U.objectFieldOffset

                   (k.getDeclaredField("cellsBusy"));

               Class<?> ck = CounterCell.class;

               CELLVALUE = U.objectFieldOffset

                   (ck.getDeclaredField("value"));

               Class<?> ak = Node[].class;

               ABASE = U.arrayBaseOffset(ak);

               int scale = U.arrayIndexScale(ak);

               if ((scale & (scale - 1)) != 0)

                   throw new Error("data type scale not a power of two");

               ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);

           } catch (Exception e) {

               throw new Error(e);

           }

       }

    2.3.2 三个核心方法

    ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

    //获得在i位置上的Node节点

        static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {

            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);

        }

            //利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少

            //CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改

            //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果  有点类似于SVN

        static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,

                                            Node<K,V> c, Node<K,V> v) {

            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

        }

            //利用volatile方法设置节点位置的值

        static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {

            U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);

        }

    2.4 初始化方法initTable

    对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用putcomputeIfAbsentcomputemerge等方法的时候,调用时机是检查table==null

    初始化方法主要应用了关键属性sizeCtl 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n

    /**

         * Initializes table, using the size recorded in sizeCtl.

         */

        private final Node<K,V>[] initTable() {

            Node<K,V>[] tab; int sc;

            while ((tab = table) == null || tab.length == 0) {

                    //sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。

                if ((sc = sizeCtl) < 0)

                    Thread.yield(); // lost initialization race; just spin

                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化

                    try {

                        if ((tab = table) == null || tab.length == 0) {

                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                            @SuppressWarnings("unchecked")

                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

                            table = tab = nt;

                            sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值

                        }

                    } finally {

                        sizeCtl = sc;

                    }

                    break;

                }

            }

            return tab;

        }

    2.5 扩容方法 transfer

    ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。

    整个扩容操作分为两个部分

    第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到;

    第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

    先来看一下单线程是如何完成的:

    它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

    如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;

    如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTableii+n的位置上

    如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTableii+n的位置上

    遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

    再看一下多线程是如何完成的:

    在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值setforward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。

    /**

        * 一个过渡的table表  只有在扩容的时候才会使用

        */

       private transient volatile Node<K,V>[] nextTable;

    /**

        * Moves and/or copies the nodes in each bin to new table. See

        * above for explanation.

        */

       private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

           int n = tab.length, stride;

           if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)

               stride = MIN_TRANSFER_STRIDE; // subdivide range

           if (nextTab == null) {            // initiating

               try {

                   @SuppressWarnings("unchecked")

                   Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍

                   nextTab = nt;

               } catch (Throwable ex) {      // try to cope with OOME

                   sizeCtl = Integer.MAX_VALUE;

                   return;

               }

               nextTable = nextTab;

               transferIndex = n;

           }

           int nextn = nextTab.length;

           ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位

           boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过

           boolean finishing = false; // to ensure sweep before committing nextTab

           for (int i = 0, bound = 0;;) {

               Node<K,V> f; int fh;

               //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点

               while (advance) {

                   int nextIndex, nextBound;

                   if (--i >= bound || finishing)

                       advance = false;

                   else if ((nextIndex = transferIndex) <= 0) {

                       i = -1;

                       advance = false;

                   }

                   else if (U.compareAndSwapInt

                            (this, TRANSFERINDEX, nextIndex,

                             nextBound = (nextIndex > stride ?

                                          nextIndex - stride : 0))) {

                       bound = nextBound;

                       i = nextIndex - 1;

                       advance = false;

                   }

               }

               if (i < 0 || i >= n || i + n >= nextn) {

                   int sc;

                   if (finishing) {

                       //如果所有的节点都已经完成复制工作  就把nextTable赋值给table 清空临时对象nextTable

                       nextTable = null;

                       table = nextTab;

                       sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设为原容量1.5相当于现在容量的0.75

                       return;

                   }

    //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作

                   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

                       if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

                           return;

                       finishing = advance = true;

                       i = n; // recheck before commit

                   }

               }

               //如果遍历到的节点为空 则放入ForwardingNode指针

               else if ((f = tabAt(tab, i)) == null)

                   advance = casTabAt(tab, i, null, fwd);

    //如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心

               else if ((fh = f.hash) == MOVED)

                   advance = true; // already processed

               else {

                       //节点上锁

                   synchronized (f) {

                       if (tabAt(tab, i) == f) {

                           Node<K,V> ln, hn;

                           //如果fh>=0 证明这是一个Node节点

                           if (fh >= 0) {

                               int runBit = fh & n;

      //以下的部分在完成的工作是构造两个链表  一个是原链表  另一个是原链表的反序排列

                               Node<K,V> lastRun = f;

                               for (Node<K,V> p = f.next; p != null; p = p.next) {

                                   int b = p.hash & n;

                                   if (b != runBit) {

                                       runBit = b;

                                       lastRun = p;

                                   }

                               }

                               if (runBit == 0) {

                                   ln = lastRun;

                                   hn = null;

                               }

                               else {

                                   hn = lastRun;

                                   ln = null;

                               }

                               for (Node<K,V> p = f; p != lastRun; p = p.next) {

                                   int ph = p.hash; K pk = p.key; V pv = p.val;

                                   if ((ph & n) == 0)

                                       ln = new Node<K,V>(ph, pk, pv, ln);

                                   else

                                       hn = new Node<K,V>(ph, pk, pv, hn);

                               }

                               //nextTablei位置上插入一个链表

                               setTabAt(nextTab, i, ln);

                               //nextTablei+n的位置上插入另一个链表

                               setTabAt(nextTab, i + n, hn);

                               //tablei位置上插入forwardNode节点  表示已经处理过该节点

                               setTabAt(tab, i, fwd);

                               //设置advancetrue 返回到上面的while循环中 就可以执行i--操作

                               advance = true;

                           }

                           //TreeBin对象进行处理  与上面的过程类似

                           else if (f instanceof TreeBin) {

                               TreeBin<K,V> t = (TreeBin<K,V>)f;

                               TreeNode<K,V> lo = null, loTail = null;

                               TreeNode<K,V> hi = null, hiTail = null;

                               int lc = 0, hc = 0;

                               //构造正序和反序两个链表

                               for (Node<K,V> e = t.first; e != null; e = e.next) {

                                   int h = e.hash;

                                   TreeNode<K,V> p = new TreeNode<K,V>

                                       (h, e.key, e.val, null, null);

                                   if ((h & n) == 0) {

                                       if ((p.prev = loTail) == null)

                                           lo = p;

                                       else

                                           loTail.next = p;

                                       loTail = p;

                                       ++lc;

                                   }

                                   else {

                                       if ((p.prev = hiTail) == null)

                                           hi = p;

                                       else

                                           hiTail.next = p;

                                       hiTail = p;

                                       ++hc;

                                   }

                               }

                               //如果扩容后已经不再需要tree的结构 反向转换为链表结构

                               ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :

                                   (hc != 0) ? new TreeBin<K,V>(lo) : t;

                               hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :

                                   (lc != 0) ? new TreeBin<K,V>(hi) : t;

                                //nextTablei位置上插入一个链表    

                               setTabAt(nextTab, i, ln);

                               //nextTablei+n的位置上插入另一个链表

                               setTabAt(nextTab, i + n, hn);

                                //tablei位置上插入forwardNode节点  表示已经处理过该节点

                               setTabAt(tab, i, fwd);

                               //设置advancetrue 返回到上面的while循环中 就可以执行i--操作

                               advance = true;

                           }

                       }

                   }

               }

           }

       }

    2.6 Put方法

    前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是putget两个方法。现在来介绍put方法,这个put方法依然沿用HashMapput方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许keyvaluenull值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两个情况

    如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;

    如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTablesynchronized要好得多。

    整体流程就是首先定义不允许keyvaluenull的情况放入  对于每一个放入的值,首先利用spread方法对keyhashcode进行一次hash计算,由此来确定这个值在table中的位置。

    如果这个位置是空的,那么直接放入,而且不需要加锁操作。

    如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0,则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

    public V put(K key, V value) {

            return putVal(key, value, false);

        }

        /** Implementation for put and putIfAbsent */

        final V putVal(K key, V value, boolean onlyIfAbsent) {

                //不允许 keyvaluenull

            if (key == null || value == null) throw new NullPointerException();

            //计算hash

            int hash = spread(key.hashCode());

            int binCount = 0;

            //死循环 何时插入成功 何时跳出

            for (Node<K,V>[] tab = table;;) {

                Node<K,V> f; int n, i, fh;

                //如果table为空的话,初始化table

                if (tab == null || (n = tab.length) == 0)

                    tab = initTable();

                //根据hash值计算出在table里面的位置

                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

                    //如果这个位置没有值 ,直接放进去,不需要加锁

                    if (casTabAt(tab, i, null,

                                 new Node<K,V>(hash, key, value, null)))

                        break;                   // no lock when adding to empty bin

                }

                //当遇到表连接点时,需要进行整合表的操作

                else if ((fh = f.hash) == MOVED)

                    tab = helpTransfer(tab, f);

                else {

                    V oldVal = null;

                    //结点上锁  这里的结点可以理解为hash值相同组成的链表的头结点

                    synchronized (f) {

                        if (tabAt(tab, i) == f) {

                            //fh0 说明这个节点是一个链表的节点 不是树的节点

                            if (fh >= 0) {

                                binCount = 1;

                                //在这里遍历链表所有的结点

                                for (Node<K,V> e = f;; ++binCount) {

                                    K ek;

                                    //如果hash值和key值相同  则修改对应结点的value

                                    if (e.hash == hash &&

                                        ((ek = e.key) == key ||

                                         (ek != null && key.equals(ek)))) {

                                        oldVal = e.val;

                                        if (!onlyIfAbsent)

                                            e.val = value;

                                        break;

                                    }

                                    Node<K,V> pred = e;

          //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部

                                    if ((e = e.next) == null) {

                                        pred.next = new Node<K,V>(hash, key,

                                                                  value, null);

                                        break;

                                    }

                                }

                            }

                            //如果这个节点是树节点,就按照树的方式插入值

                            else if (f instanceof TreeBin) {

                                Node<K,V> p;

                                binCount = 2;

                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

                                                               value)) != null) {

                                    oldVal = p.val;

                                    if (!onlyIfAbsent)

                                        p.val = value;

                                }

                            }

                        }

                    }

                    if (binCount != 0) {

                        //如果链表长度已经达到临界值8 就需要把链表转换为树结构

                        if (binCount >= TREEIFY_THRESHOLD)

                            treeifyBin(tab, i);

                        if (oldVal != null)

                            return oldVal;

                        break;

                    }

                }

            }

            //将当前ConcurrentHashMap的元素数量+1

            addCount(1L, binCount);

            return null;

        }

    我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的并且也是线程安全的,建立在之前提到的3个原子操作上。

    2.6.1 helpTransfer方法

    这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。

    2.6.2 treeifyBin方法

    这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.

    2.7 get方法

    get方法比较简单,给定一个key来确定value的时候,必须满足两个条件  key相同  hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。

    public V get(Object key) {

            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

            //计算hash

            int h = spread(key.hashCode());

            //根据hash值确定节点位置

            if ((tab = table) != null && (n = tab.length) > 0 &&

                (e = tabAt(tab, (n - 1) & h)) != null) {

                //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点  

                if ((eh = e.hash) == h) {

                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))

                        return e.val;

                }

                //如果eh<0 说明这个节点在树上 直接寻找

                else if (eh < 0)

                    return (p = e.find(h, key)) != null ? p.val : null;

                 //否则遍历链表 找到对应的值并返回

                while ((e = e.next) != null) {

                    if (e.hash == h &&

                        ((ek = e.key) == key || (ek != null && key.equals(ek))))

                        return e.val;

                }

            }

            return null;

        }

    2.8 Size相关的方法

    对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。

    2.8.1 辅助定义

    为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类

    /**

         * A padded cell for distributing counts.  Adapted from LongAdder

         * and Striped64.  See their internal docs for explanation.

         */

        @sun.misc.Contended static final class CounterCell {

            volatile long value;

            CounterCell(long x) { value = x; }

        }

      /******************************************/

        /**

         * 实际上保存的是hashmap中的元素个数  利用CAS锁进行更新

         但它并不用返回当前hashmap的元素个数

          */

        private transient volatile long baseCount;

        /**

         * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.

         */

        private transient volatile int cellsBusy;

        /**

         * Table of counter cells. When non-null, size is a power of 2.

         */

        private transient volatile CounterCell[] counterCells;

    2.8.2 mappingCountSize方法

    mappingCountsize方法的类似  从Java工程师给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。

    public int size() {

            long n = sumCount();

            return ((n < 0L) ? 0 :

                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

                    (int)n);

        }

         /**

         * Returns the number of mappings. This method should be used

         * instead of {@link #size} because a ConcurrentHashMap may

         * contain more mappings than can be represented as an int. The

         * value returned is an estimate; the actual count may differ if

         * there are concurrent insertions or removals.

         *

         * @return the number of mappings

         * @since 1.8

         */

        public long mappingCount() {

            long n = sumCount();

            return (n < 0L) ? 0L : n; // ignore transient negative values

        }

          final long sumCount() {

            CounterCell[] as = counterCells; CounterCell a;

            long sum = baseCount;

            if (as != null) {

                for (int i = 0; i < as.length; ++i) {

                    if ((a = as[i]) != null)

                        sum += a.value;//所有counter的值求和

                }

            }

            return sum;

        }

    2.8.3 addCount方法

    put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。

    private final void addCount(long x, int check) {

            CounterCell[] as; long b, s;

            //利用CAS方法更新baseCount的值

            if ((as = counterCells) != null ||

                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

                CounterCell a; long v; int m;

                boolean uncontended = true;

                if (as == null || (m = as.length - 1) < 0 ||

                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

                    !(uncontended =

                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                    fullAddCount(x, uncontended);

                    return;

                }

                if (check <= 1)

                    return;

                s = sumCount();

            }

            //如果check值大于等于0 则需要检验是否需要进行扩容操作

            if (check >= 0) {

                Node<K,V>[] tab, nt; int n, sc;

                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&

                       (n = tab.length) < MAXIMUM_CAPACITY) {

                    int rs = resizeStamp(n);

                    //

                    if (sc < 0) {

                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

                            transferIndex <= 0)

                            break;

                         //如果已经有其他线程在执行扩容操作

                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))

                            transfer(tab, nt);

                    }

                    //当前线程是唯一的或是第一个发起扩容的线程  此时nextTable=null

                    else if (U.compareAndSwapInt(this, SIZECTL, sc,

                                                 (rs << RESIZE_STAMP_SHIFT) + 2))

                        transfer(tab, null);

                    s = sumCount();

                }

            }

        }


    总结

    JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segmentget时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。

    jdk7ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以jdk8 中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。

    主要设计上的变化有以下几点:

    不采用segment而采用node,锁住node来实现减小锁粒度。

    设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize

    使用3CAS操作来确保node的一些操作的原子性,这种方式代替了锁。

    sizeCtl的不同值来代表不同含义,起到了控制的作用。

    至于为什么JDK8中使用synchronized而不是ReentrantLock,我猜是因为JDK8中对synchronized有了足够的优化吧。

    Reference

    1. http://www.jianshu.com/p/4806633fcc55

    2. https://www.zhihu.com/question/22438589

    3. http://blog.csdn.net/u010723709/article/details/48007881

    http://www.importnew.com/22007.html

            ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 500,
                    60L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(1000), new ThreadFactory() {
                private LongAdder longAdder = new LongAdder(); //0
    
                @Override
                public Thread newThread(Runnable runnable) {
                    longAdder.increment(); //变成1
                    return new Thread(runnable, "initOrderAuthorization-" + longAdder.toString());
                }
            });
    本文首发于一世流云的专栏:https://segmentfault.com/blog...

    一、LongAdder简介

    JDK1.8时,java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder
    根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间:
    clipboard.png

    那么,问题来了:

    为什么要引入LongAdder? AtomicLong在高并发的场景下有什么问题吗? 如果低并发环境下,LongAdderAtomicLong性能差不多,那LongAdder是否就可以替代AtomicLong了?

    为什么要引入LongAdder?

    我们知道,AtomicLong是利用了底层的CAS操作来提供并发性的,比如addAndGet方法:

    clipboard.png

    上述方法调用了Unsafe类的getAndAddLong方法,该方法是个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。

    在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。

    这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。

    LongAdder快在哪里?

    既然说到LongAdder可以显著提升高并发环境下的性能,那么它是如何做到的?这里先简单的说下LongAdder的思路,第二部分会详述LongAdder的原理。

    我们知道,AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。

    LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

    这种做法有没有似曾相识的感觉?没错,ConcurrentHashMap中的“分段锁”其实就是类似的思路。

    LongAdder能否替代AtomicLong?

    回答这个问题之前,我们先来看下LongAdder提供的API:
    clipboard.png

    可以看到,LongAdder提供的API和AtomicLong比较接近,两者都能以原子的方式对long型变量进行增减。

    但是AtomicLong提供的功能其实更丰富,尤其是addAndGet、decrementAndGet、compareAndSet这些方法。

    addAndGet、decrementAndGet除了单纯的做自增自减外,还可以立即获取增减后的值,而LongAdder则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,做计数比较,AtomicLong也更合适。

    另外,从空间方面考虑,LongAdder其实是一种“空间换时间”的思想,从这一点来讲AtomicLong更适合。当然,如果你一定要跟我杠现代主机的内存对于这点消耗根本不算什么,那我也办法。

    总之,低并发、一般的业务场景下AtomicLong是足够了。如果并发量很多,存在大量写多读少的情况,那LongAdder可能更合适。适合的才是最好的,如果真出现了需要考虑到底用AtomicLong好还是LongAdder的业务场景,那么这样的讨论是没有意义的,因为这种情况下要么进行性能测试,以准确评估在当前业务场景下两者的性能,要么换个思路寻求其它解决方案。

    最后,给出国外一位博主对LongAdder和AtomicLong的性能评测,以供参考:http://blog.palominolabs.com/...

    二、LongAdder原理

    之前说了,AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。

    比如有三个ThreadA、ThreadB、ThreadC,每个线程对value增加10。

    对于AtomicLong,最终结果的计算始终是下面这个形式:

    但是对于LongAdder来说,内部有一个base变量,一个Cell[]数组。
    base变量:非竞态条件下,直接累加到该变量上
    Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]
    最终结果的计算是下面这个形式:

    LongAdder的内部结构

    LongAdder只有一个空构造器,其本身也没有什么特殊的地方,所有复杂的逻辑都在它的父类Striped64中。
    clipboard.png

    来看下Striped64的内部结构,这个类实现一些核心操作,处理64位数据。
    Striped64只有一个空构造器,初始化时,通过Unsafe获取到类字段的偏移量,以便后续CAS操作:
    clipboard.png

    上面有个比较特殊的字段是threadLocalRandomProbe,可以把它看成是线程的hash值。这个后面我们会讲到。

    定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过Unsafe来CAS操作它的值:
    clipboard.png

    其它的字段:
    可以看到Cell[]就是之前提到的槽数组,base就是非并发条件下的基数累计值。
    clipboard.png

    LongAdder的核心方法

    还是通过例子来看:
    假设现在有一个LongAdder对象la,四个线程A、B、C、D同时对la进行累加操作。

    LongAdder la = new LongAdder();
    la.add(10);

    ThreadA调用add方法(假设此时没有并发):
    clipboard.png

    初始时Cell[]为null,base为0。所以ThreadA会调用casBase方法(定义在Striped64中),因为没有并发,CAS操作成功将base变为10:
    clipboard.png

    可以看到,如果线程A、B、C、D线性执行,那casBase永远不会失败,也就永远不会进入到base方法的if块中,所有的值都会累积到base中。
    那么,如果任意线程有并发冲突,导致caseBase失败呢?

    失败就会进入if方法体:
    clipboard.png

    这个方法体会先再次判断Cell[]槽数组有没初始化过,如果初始化过了,以后所有的CAS操作都只针对槽中的Cell;否则,进入longAccumulate方法。

    整个add方法的逻辑如下图:
    clipboard.png

    可以看到,只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。
    如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。

    这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。

    Striped64的核心方法

    我们来看下Striped64的核心方法longAccumulate到底做了什么:
    clipboard.png

    上述代码首先给当前线程分配一个hash值,然后进入一个自旋,这个自旋分为三个分支:

    • CASE1:Cell[]数组已经初始化
    • CASE2:Cell[]数组未初始化
    • CASE3:Cell[]数组正在初始化中

    CASE2:Cell[]数组未初始化

    我们之前讨论了,初始时Cell[]数组还没有初始化,所以会进入分支②:
    clipboard.png

    首先会将cellsBusy置为1-加锁状态
    clipboard.png

    然后,初始化Cell[]数组(初始大小为2),根据当前线程的hash值计算映射的索引,并创建对应的Cell对象,Cell单元中的初始值x就是本次要累加的值。

    CASE3:Cell[]数组正在初始化中

    如果在初始化过程中,另一个线程ThreadB也进入了longAccumulate方法,就会进入分支③:
    clipboard.png

    可以看到,分支③直接操作base基数,将值累加到base上。

    CASE1:Cell[]数组已经初始化

    如果初始化完成后,其它线程也进入了longAccumulate方法,就会进入分支①:
    clipboard.png

    整个longAccumulate的流程图如下:
    clipboard.png

    LongAdder的sum方法

    最后,我们来看下LongAdder的sum方法:
    clipboard.png

    sum求和的公式就是我们开头说的:

    需要注意的是,这个方法只能得到某个时刻的近似值,这也就是LongAdder并不能完全替代LongAtomic的原因之一。

    三、LongAdder的其它兄弟

    JDK1.8时,java.util.concurrent.atomic包中,除了新引入LongAdder外,还有引入了它的三个兄弟类:LongAccumulatorDoubleAdderDoubleAccumulator

    clipboard.png

    LongAccumulator

    LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。其构造函数如下:
    clipboard.png

    通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个long)

    LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。这里就不再赘述了,读者可以自己阅读源码。

    DoubleAdder和DoubleAccumulator

    从名字也可以看出,DoubleAdder和DoubleAccumulator用于操作double原始类型。

    与LongAdder的唯一区别就是,其内部会通过一些方法,将原始的double类型,转换为long类型,其余和LongAdder完全一样:
    clipboard.png

  • 相关阅读:
    设计模式之六大设计原则学习笔记
    java多线程学习笔记
    mac上安装mongodb数据库教程
    在mac下使用终端命令通过ssh协议连接远程linux系统,代替windows的putty
    从request对象中获取请求json格式的参数
    @Conditional注释
    lambda 根据实体类的拼音排序
    PHP一行代码获取时间戳
    PHP导出生成Excel文件
    Mysql merge引擎介绍
  • 原文地址:https://www.cnblogs.com/softidea/p/8591694.html
Copyright © 2011-2022 走看看