zoukankan      html  css  js  c++  java
  • 拿捏了!ConcurrentHashMap!

    概述

    本文将对JDK8中 ConcurrentHashMap 源码进行一定程度的解读。解读主要分为六个部分:主要属性与相关内部类介绍、构造函数、put过程、扩容过程、size过程、get过程、与JDK7实现的简单对比。希望对读者学习ConcurrentHashMap有一定的帮助。
    阅读本文前,可能需要读者对HashMap和红黑树等有基本的了解。

    主要属性和主要的内部类

    主要属性

    常量

    ConcurrentHashMap中常量一共分为以下几个部分:

    • 容量相关:MAXIMUM_CAPACITY、DEFAULT_CAPACITY、MAX_ARRAY_SIZE
    • 兼容JDK 7而保留的部分常量:DEFAULT_CONCURRENCY_LEVEL、LOAD_FACTOR
    • 红黑树升级和退化相关的常量:TREEIFY_THRESHOLD、UNTREEIFY_THRESHOLD、MIN_TREEIFY_CAPACITY
    • 扩容相关:MIN_TRANSFER_STRIDE、RESIZE_STAMP_BITS、MAX_RESIZERS、RESIZE_STAMP_SHIFT
    • 节点状态常量:MOVED、TREEBIN、RESERVED
        /* ---------------- Constants -------------- */
    
        /**
         * HashMap的最大容量
         */
        private static final int MAXIMUM_CAPACITY = 1 << 30;
    
        /**
         * 默认容量
         */
        private static final int DEFAULT_CAPACITY = 16;
    
        /**
         * 数组最大长度
         */
        static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
        /**
         * 默认最大并发等级
         */
        private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    
        /**
         * 负载因子
         */
        private static final float LOAD_FACTOR = 0.75f;
    
        /**
         * 链表升级成红黑树的阈值
         */
        static final int TREEIFY_THRESHOLD = 8;
    
        /**
         * 红黑树退化成链表的阈值
         */
        static final int UNTREEIFY_THRESHOLD = 6;
    
        /**
         * 链表升级成树需要满足的最小容量,若不满足,则会先扩容
         */
        static final int MIN_TREEIFY_CAPACITY = 64;
    
        //最小转移步长
        private static final int MIN_TRANSFER_STRIDE = 16;
    
        //这个常量是用来计算HashMap不同容量有不同的resizeStamp用的
        private static int RESIZE_STAMP_BITS = 16;
    
    
        //最大参与扩容的线程数 相当大的一个数 基本上是不会触及该上线的
        private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    
        //要对resizeStamp进行位移运算的一个敞亮
        private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    
        //特殊的节点哈希值
        static final int MOVED     = -1; // hash for forwarding nodes
        static final int TREEBIN   = -2; // hash for roots of trees
        static final int RESERVED  = -3; // hash for transient reservations
        static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    
        //获取CPU的数量
        static final int NCPU = Runtime.getRuntime().availableProcessors();
    
    

    其中,因为JDK8中ConcurrentHashMap的实现方式和JDK7的不同,因此DEFAULT_CONCURRENCY_LEVEL已经没有实际作用了。并且在JDK8中,LOAD_FACTOR也已经固定成了0.75f。
    另外,MOVED,TREEBIN,RESERVED是用来表示特殊节点的哈希值。该类特殊节点均不含实际元素,且其哈希值被设置为负数和普通节点区分。
    剩下的涉及扩容的常量我们在相关的章节中再介绍。

    成员变量

    通常成员变量都是会负责记录当前类的状态的,ConcurrentHashMap也是如此。因此了解清除成员变量的作用,对我们后续分析ConcurrentHashMap的操作流程很有感帮助。

        /* ---------------- Fields -------------- */
    
        /**
         * 底层数组
         */
        transient volatile Node<K,V>[] table;
        //扩容时 使用的另一个数组
        private transient volatile Node<K,V>[] nextTable;
    
        //统计size的一部分
        private transient volatile long baseCount;
    
        /**
         * sizeCtl与table的resize和init有关
         * sizeCtl = -1时,表示table正在init
         * sizeCtl < 0 且不等于-1时,表示正在resize
         * sizeCtl > 0 时,表示下次需要resize的阈值,即capacity * loadfactory
         */
        private transient volatile int sizeCtl;
    
        //记录下一次要transfer对应的Index
        private transient volatile int transferIndex;
    
    
        //表示是否有线程正在修改CounterCells
        private transient volatile int cellsBusy;
    
        //用来统计size
        private transient volatile CounterCell[] counterCells;
    
        // views
        private transient KeySetView<K,V> keySet;
        private transient ValuesView<K,V> values;
        private transient EntrySetView<K,V> entrySet;
    
    
    

    同样的,成员变量也可以按作用分成几类:

    • 用作底层数据结构的实现:Node<K,V>[] table
    • 用作统计元素的大小:baseCount、CounterCell[] counterCells、cellsBusy
    • 用作记录ConcurrentHashMap的状态:sizeCtl
    • 用作扩容记录:Node<K,V>[] nextTable、transferIndex
    • 用作转成其它类型的视图:KeySetView<K,V> keySet、ValuesView<K,V> values、EntrySetView<K,V> entrySet

    table数组很好理解。因为HashMap的实现是基于数组的,在冲突时通过链地址解决。因此所有的数据都以数组为入口。
    另一个数组nextTable是在扩容时备用的。 如果了解Redis的数据结构的读者,应该对这个不陌生,redis渐进式rehash就是通过两个哈希表加一个index实现的。而JDK8中在resize时,也采取了类似的方式(下文我们会介绍到:按步长逐步transfer)。

    另外,比较重要的一个属性就是sizeCtl。
    如果看过Doug Lea老爷子在JUC下的其他类,经常会有一个特殊的变量表示当前对象的状态。并且已CAS的方式去修改这个变量,实现自旋锁的功能(例如:AQS中的state)。
    这里的sizeCtl就是一个富有特殊意义的变量。
    当sizeCtl大于0时,表示扩容的阈值(没错,就是HashMap中threshold变量的作用),而且上文我们也了解到在JDK8中由于loadfactor已经被固定为0.75f。因此在正常状态下(非扩容状态),sizeCtl = oldCap >> 1 - (oldCap << 2)。 而sizeCtl == -1是一个特殊的状态标志,表示ConcurrentHashMap正在初始化底层数组。
    当sizeCtl为其他负数时,表示ConcurrentHashMap正在进程扩容,其中,高16位可以反应出扩容前数组的大小,而后16位可以反应出此时参与扩容的线程数。

    内部类

    ConcurrentHashMap拥有大量的内部类,但其中大部分都是用来遍历或是在Fork/Join框架中平行遍历时使用的。这部分类内部类我们不在过多介绍。主要看CountCell和几个Node的类。

    CounterCell

    首先,CounterCell是用来统计ConcurrentHashMap用的,其内部有个value,用来表示元素个数。size()函数就是通过累加countCells数组中所有CounterCell的value值,再加上BaseCount得到的。相当于ConcurrentHashMap把size这个属性拆散保存在了个多个地方。

    Node

    同HashMap一样,为了提高链表的遍历速度,ConcurrentHashMap也引用了红黑树。而Node就表示链表中的节点,并且他还是其他节点的父类。

    TreeNode

    TreeNode表示红黑树中的节点,按照红黑树的标准,它还拥有父节点和左右子节点的属性,此外还需要标识是否为红节点。

    TreeBin

    TreeBin是一个特殊的节点,用来指向红黑树的根节点,并不存储真实的元素,因此它的节点的哈希值是一个固定的特殊值-2。

    ForwardingNode

    ForwardingNode和TreeBin一样,并不存储实际元素,而是指向nextTable,哈希值也是一个特殊的固定值(-1)。它在扩容中会使用,表示这个桶上的元素已经迁移到新的数组中去了。

    ReservationNode

    同样是一个特殊值,在putIfAbsent时使用。因为put时需要对桶上的元素上对象锁(ConcurrentHashMap并非是完全无锁的,只是尽可能少的去使用锁),这时就会添加一个临时占位用的节点ReservationNode。

    构造函数

    因为构造函数是公有的API,所以必须要和JDK7中保持一致。虽然其中的部分含义可能发生了一些变化。
    我们看一下参数最全的构造函数。

        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
        }
    

    上述方法中的三个参数分别是初始容量initialCapacity,负载因子loadfactor和并行等级concurrenyLevel。
    首先,loadfactor负载因子在JDK8的ConcurrentHashMap运行时都已经固定为0.75f,因此这里的参数只能在创建时,帮助确定初始的数组容量。
    同样的,由于不在使用JDK7的Segement实现方式,因此这里的concurrencyLevel不在用来确定Segement的数量。对于JDK8中的ConcurrentHashMap而言,锁的粒度是对数组的每个桶(理论上可以对每个桶进行并发操作),因此concurrencyLevel的含义也就是用来确定底层数据的初始容量。
    这也正是size = (long)(1.0 + (long)initialCapacity / loadFactor);这行代码的意义(这里的initialCapacity是取参数中initialCapacity和concurrenyLevel中的最大值)。

    另外需要注意的一点是,size并不是最终我们数组的容量,ConcurrentHashMap会通过tableSizeFor()方法找出大于等于size的最小2的幂次方数作为容量。(这和HashMap是一样的,需要保证容量为2的幂次,因为之后的散列操作都是基于这一前提)。
    最后,在得出了初始容量后,ConcurrentHashMap仅是将容量通过sizeCtl来保存,而并没有直接初始化数组。数组的初始化会被延迟到第一次put数据时(这样设计可能是出于节省内存的目的)。

    put过程

    有了前文的铺垫,我们就可以开始了解ConcurrentHashMap的put过程了。

    先在这里做个声明,本文不会对红黑树的部分展开详细分析,之后用链表升级成红黑树,红黑树退化成链表,在红黑树中查找直接概括某些过程。

    put()的具体实现都是由putVal()这个函数实现的。因此这里我们对putVal()函数展开分析。

        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            //for循环,一直尝试,直到put成功
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //tab未初始化,先初始化tab
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//对应的bucket上还没有元素
                    //采用CAS尝试PUT元素,如果此时没有其它线程操作,这里将会PUT成功
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)//如果tab正在扩容
                    tab = helpTransfer(tab, f);
                else {//bucket上已经存在元素
                    V oldVal = null;
                    //只针对头节点同步,不影响其他bucket上的元素,提高效率
                    synchronized (f) {
                        //同步块内在做一次检查
                        if (tabAt(tab, i) == f) {//说明头节点未发生改变,如果发生改变,则直接退出同步块,并再次尝试
                            if (fh >= 0) { //哈希值大于0 说明是tab[i]上放的是链表 因为对于红黑树而言 tab[i]上放的是TreeBin一个虚拟的节点 其哈希值固定为-2
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    //查询链表,如果存在相同key,则更新,否则插入新节点
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {//如果是红黑树,则以红黑树的方式插入
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    //判断链表是否需要转成树
                    //值得注意的一点是,这段代码并未在同步块中,应该也是出于效率考虑
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    
    

    整体了解putVal()的流程

    先整体的了解下putVal(),不对for()循环中代码具体分析。
    第一步是校验要put的Key/Value不能为null。因此ConcurrentHashMap和HashMap不同,不支持空的键值。
    第二步是spread(key.hashCode())是对键的哈希值做一个扰动,这里通过h^(h>>>16)的算法实现的,这样做的目的有两个,一是避免了设计不好的hashCode函数造成碰撞的概率加大,二是确保了扰动后的哈希值均为正数(因为负数哈希值都是一些特殊的节点)。
    第三步是for()循环,这里通过CAS+自选保证线程安全,暂时先不具体分析。
    第四步addCount()应该是表示成功往ConcurrentHashMap添加了元素后,让更新元素的数量(当然,我们可以猜想对于替换节点的情况,应该是不会执行这一步的)。这个方法的具体分析我们放在扩容的步骤中。

    分析for()循环中的代码

    for()循环中的代码 同样分成了四个部分:

    第一步:如果底层数组还没有初始化,通过initTable()初始化数组

    initTable()方法如下:

    private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            //同样 采用不断重试的方式,而非直接使用锁
            while ((tab = table) == null || tab.length == 0) {
                //sizeCtl < 0 表示table正在被初始化或是reszie
                if ((sc = sizeCtl) < 0)
                    //当前线程先等待
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //使用CAS操作更新sizeCtl,标记为正在初始化中
                    //由于采用了CAS操作,因此该块的方法可以认为是线程安全的
                    try {
                        if ((tab = table) == null || tab.length == 0) {//初始化
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            //表示下次需要扩容的值 (1 - 1/4) = 0.75f
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    initTable()不算复杂,首先为了避免多个线程同时进行初始化,需要通过sizeCtl进行控制。
    当线程发现sizeCtl<0时,就知道此时已经有其他线程在初始化了,那么它会主动让出CPU时间,等待初始化完成。
    如果sizeCtl并不是小于0,说明暂时没有其他线程在初始化,这时候要先通过CAS更新sizeCtl的值为-1(相当于抢占了自旋锁),然后开始初始化底层数组并设置为table,然后计算下次扩容的阈值存放在sizeCtl(具体值为n - (n >>>2),即容量的0.75*n)。

    第二步:如果已经初始化但是对应桶上元素为null,那么尝试CAS更新

    首先,这里确定桶的算法是通过之前spread()得到的哈希值h和数组容量n进行一次h & (n -1)
    这个方式和HashMap是相同的,因为n是2的幂,换成二进制,就是高位为1之后的低位全为0的数,那么这个数减1就成了全为1的一个数。以这样的方式代替取余的运算不仅计算更快,也能更好的利用哈希值散列。
    如果,这一步CAS失败,说明此时有其他线程也在操作该桶,那么当前线程在下次for()循环时会进入下列的第三和第四步中。

    第三步:说明已经初始化且桶上有元素,那么判断元素是否为ForwardNode

    如果线程发现自己要操作的桶上的节点是ForwardNode(可以通过其特殊的哈希值判断),那么就说明此时ConcurrentHashMap扩容,线程可能会加入帮助扩容。具体的我们放在扩容的部分介绍。

    第四步:说明桶上元素是正常元素,那么就要比对这个桶所有元素,进行更新或插入

    这里说明该桶上存放的是正常的元素(TreeBin虽然是一个特殊节点,但也是正常状态下存在的节点),为了线程安全,这里需要对桶上的元素进行上锁synchronized(f)。然后在遍历桶上所有的元素,选择更新或者插入。
    第一,需要注意的是,上锁后的第一件事就是进行double-check的判断,看上锁过程中头节点是否发生了变化。这很重要,如果头节点发生了变化,那么对之前的头节点f上锁是无法保证线程安全的。
    第二,对于桶上是链表的情况(f.hash > 0),ConcurrentHashMap会遍历链表,比较链表的各个节点,如果之前存在相同的key,那么替换该节点的value值(保存节点的旧值用于返回)。如果不存在相同的key,那么创建新的节点插入链表(注意,ConcurrentHashMap用的是尾插发,即插入链表尾部)。
    第三,针对是TreeBin的节点,说明桶上关联的是红黑树,则通过红黑树的方式进行插入或更新。

    扩容过程

    扩容过程过程可能要比put过程要稍微复杂一些。首先我们从上文提到的addCount()函数开始分析。

    addCount()更新元素的容器个数

    当ConcurrentHashMap添加了元素之后,需要通过addCount()更新元素的个数。
    并且如果发现元素的个数达到了扩容阈值(sizeCtl),那么将进行resize()操作。

        private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
    
            //更新size
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();
            }
            //resize
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                //不断CAS重试
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {//需要resize
                    //为每个size生成一个独特的stamp 这个stamp的第16为必为1 后15位针对每个n都是一个特定的值 表示n最高位的1前面有几个零
                    int rs = resizeStamp(n);
                    //sc会在库容时变成 rs << RESIZE_STAMP_SHIFT + 2;上面说了rs的第16位为1 因此在左移16位后 该位的1会到达符号位 因此在扩容是sc会成为一个负数
                    //而后16位用来记录参与扩容的线程数
                    //此时sc < 0 说明正在扩
                    if (sc < 0) {
                        /**
                         * 分别对五个条件进行说明
                         * sc >>> RESIZE_STAMP_SHIFT != rs 取sc的高16位 如果!=rs 则说明HashMap底层数据的n已经发生了变化
                         * sc == rs + 1  此处可能有问题  我先按自己的理解 觉得应该是  sc == rs << RESIZE_STAMP_SHIFT + 1; 因为开始transfer时 sc = rs << RESIZE_STAMP_SHIFT + 2(一条线程在扩容,且之后有新线程参与扩容sc均会加1,而一条线程完成后sc - 1)说明是参与transfer的线程已经完成了transfer
                         * 同理sc == rs + MAX_RESIZERS 这个应该也改为 sc = rs << RESIZE_STAMP_SHIFT + MAX_RESIZERS 表示参与迁移的线程已经到达最大数量 本线程可以不用参与
                         * (nt = nextTable) == null 首先nextTable是在扩容中间状态才使用的数组(这一点和redis的渐进式扩容方式很像) 当nextTable 重新为null时 说明transfer 已经finish
                         * transferIndex <= 0 也是同理
                         * 遇上以上这些情况 说明此线程都不需要参与transfer的工作
                         * PS: 翻了下JDK16的代码 这部分已经改掉了 rs = resizeStamp(n) << RESIZE_STAMP_SHIFT 证明我们的猜想应该是正确的
                         */
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        //否则该线程需要一起transfer
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    //说明没有其他线程正在扩容 该线程会将sizeCtl设置为负数 表示正在扩容
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }
    
    

    如上文所说,这个方法有两个作用,一是更新元素个数,二是判断是否需要resize()。

    更新size()

    我们可以单独看addCount中更新size的部分

            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();
            }
    

    首先判断countCells是否已经被初始化,如果没有被初始化,那么将尝试在size的更新操作放在baseCount上。如果此时没有冲突,那么CAS修改baseCount就能成功,size的更新就落在了baseCount上。
    如果此时已经有countCells了,那么会根据线程的探针随机落到countCells的某个下标上。对size的更新就是更新对应CountCells的value值。
    如果还是不行,将会进入fullAddCount方法中,自旋重试直到更新成功。这里不对fullAddCount展开介绍,具体操作也类似,size的变化要么累加在对应的CountCell上,要么累加在baseCount上。
    这里说一下我个人对ConcurrentHashMap采用这么复杂的方式进行计数的理解。因为ConcurrenthHashMap是出于吞吐量最大的目的设计的,因此,如果单纯的用一个size直接记录元素的个数,那么每次增删操作都需要同步size,这会让ConcurrentHashMap的吞吐量大大降低。
    因为,将size分散成多个部分,每次修改只需要对其中的一部分进行修改,可以有效的减少竞争,从而增加吞吐量。

    resize()

    对于resize()过程,我其实在代码的注释中说明的比较详细了。
    首先,是一个while()循环,其中的条件是元素的size(由上一步计算而来)已经大于等于sizeCtl(说明到达了扩容条件,需要进行resize),这是用来配合CAS操作的。
    接着,是根据当前数组的容量计算了resizeStamp(该函数会根据不同的容量得到一个确定的数)。得到的这个数会在之后的扩容过程中被使用。
    然后是比较sizeCtl,如果sizeCtl小于0,说明此时已经有线程正在扩容,排除了几种不需要参与扩容的情况(例如,扩容已经完成,或是参与的扩容线程数已经到最大值,具体情况代码上的注解已经给出了分析),剩下的情况当前线程会帮助其他线程一起扩容,扩容前需要修改CAS修改sizeCtl(因为在扩容时,sizeCtl的后16位表示参与扩容的线程数,每当有一个线程参与扩容,需要对sizeCtl加1,当该线程完成时,对sizeCtl减1,这样比对sizeCtl就可以知道是否所有线程都完成了扩容)。
    另外如果sizeCtl大于0,说明还没有线程参与扩容,此时需要CAS修改sizeCtl为rs << RESIZE_STAMP_SHIFT + 2(其中rs是有resizeStamp(n)得到的),这是一个负数,上文也说了这个数的后16位表示参与扩容的线程,当所有线程都完成了扩容时,sizeCtl应该为rs << RESIZE_STAMP_SHIFT + 1。这是我们结束扩容的条件,会在后文看到。

    transfer()

    transfer()方法负责对数组进行扩容,并将数据rehash到新的节点上。这一过程中会启用nextTable变量,并在扩容完成后,替换成table变量。

       private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            //stride是步长,transfer会依据stride,把table分为若干部分,依次处理,好让多线程能协助transfer
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            if (nextTab == null) {            // initiating //nextTab等于null表示第一个进来扩容的线程
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //第一个线程需要对扩容的数组翻倍
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                //用nextTable和transferIndex表示扩容的中间状态
                nextTable = nextTab;
                transferIndex = n;
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            boolean advance = true; // advance 表示是否可以继续执行下一个stride
            boolean finishing = false; // to ensure sweep before committing nextTab finish表示transfer是否已经完成 nextTable已经替换了table
    
            //开始转移各个槽
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                //STEP1  判断是否可以进入下一个stride 确认i和bound
                //通过stride领取一部分的transfer任务,while循环就是确认边界
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing) //认领的部分已经被执行完(一个stride执行完)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) { //transfer任务被认领完
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) { //认领一个stride的任务
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
    
                /**
                 * i < 0 说明要转移的桶 都已经处理过了
                 *
                 *
                 * 以上条件已经说明 transfer已经完成了
                 */
                if (i < 0 || i >= n || i + n >= nextn) { //transfer 结束
                    int sc;
                    if (finishing) {//如果完成整个 transfer的过程 清空nextTable 让table等于扩容后的数组
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1); //0.75f * n 重新计算下次扩容的阈值
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//一个线程完成了transfer
                        //如果还有其他线程在transfer 先返回
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        //说明这是最后一个在transfer的线程 因此finish标志被置为 true
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                else if ((f = tabAt(tab, i)) == null) //如果该节点为null,则对该节点的迁移立马完成,设置成forwardNode
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else { //开始迁移该节点
                    synchronized (f) {//同步,保证线程安全
                        if (tabAt(tab, i) == f) { //double-check
                            Node<K,V> ln, hn; //ln是扩容后依旧保留在原index上的node链表;hn是移到index + n 上的node链表
                            if (fh >= 0) { //普通链表
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                //这一次遍历的目的是找到最后一个一个节点,其后的节点hash & N 都不发生改变
                                //例如 有A->B->C->D,其hash & n 为 0,1,1,1 那就是找到B点
                                //这样做的目的是之后对链表进行拆分时 C和D不需要单独处理 维持和B的关系 B移动到新的tab[i]或tab[i+cap]上即可
                                //还有不理解的可以参考我的测试代码:https://github.com/insaneXs/all-mess/blob/master/src/main/java/com/insanexs/mess/collection/TestConHashMapSeq.java
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                //如果runBit == 0 说明之前找到的节点应该在tab[i]
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                //否则说明之前的节点在tab[i+cap]
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                //上面分析了链表的拆分只用遍历到lastRun的前一节点 因为lastRun及之后的节点已经移动好了
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    //这里不再继续使用尾插法而是改用了头插法 因此链表的顺序可能会发生颠倒(lastRun及之后的节点不受影响)
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                //将新的链表移动到nextTab的对应坐标中
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                //tab上对应坐标的节点变为ForwardingNode
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) { //红黑树节点
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                //同样拆成两棵树
                                TreeNode<K,V> lo = null, loTail = null; 
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                //是否需要退化成链表
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    

    transfer的代码比较长,我们也一部分一部分的分析各段代码的作用。
    首先,最先发起扩容的线程需要对数组进行翻倍,然后将翻倍后得到的新数组通过nextTable变量保存。并且启用了transferIndex变量,初始值为旧数组的容量n,这个变量会被用来标记已经被认领的桶的下标。
    扩容过程是从后往前的,因此transferIndex的初始值才是n。并且整个扩容过程依据步长stride,被拆分成个部分,线程从后往前依次领取一个部分,所以每次有线程领取任务,transferIndex总是要被减去一个stride。
    当线程认领的一个步长的任务完成后,继续去认领下一个步长,直到transferIndex < 0,说明所有数据都被认领完。
    当参与扩容的线程发现没有其他任务能被认领,那么就会更新sizeCtl为 sizeCtl-1 (说明有一条线程退出扩容)。最后一条线程完成了任务,发现sizeCtl == (resizeStamp(n) << RESIZE_STAMP_SHIFT + 2) ,那么说明所有的线程都完成了扩容任务,此时需要将nextTable替换为table,重置transferIndex,并计算新的sizeCtl表示下一次扩容的阈值。

    上面介绍了线程每次认领一个步长的桶数负责rehash,这里介绍下针对每个桶的rehash过程。
    首先,如果桶上没有元素或是桶上的元素是ForwardingNode,说明不用处理该桶,继续处理上一个桶。
    对于桶上存放正常的节点而言,为了线程安全,需要对桶的头节点进行上锁,然后以链表为例,需要将链表拆为两个部分,这两部分存放的位置是很有规律的,如果旧数组容量为oldCap,且节点之前在旧数组的下标为i,那么rehash链表中的所有节点将放在nextTable[i]或者nextTable[i+oldCap]的桶上(这一点可以从之前哈希值中比n最高位还靠前的一位来考虑,当前一位为0时,就落在nextTable[i]上,而前一位为1时,就落在nextTable[i+oldCap])。
    同理红黑树也会被rehash()成两部分,如果新的红黑树不满足成树条件,将会被退化成链表。
    当一个桶的元素被transfer完成后,旧数组相关位置上会被放上ForwardingNode的特殊节点表示该桶已经被迁移过。且ForwardingNode会指向nextTable。

    由于不满足树化条件而引起的扩容

    当一个桶上的链表节点数大于8,但是数组容量又小于64时,ConcurrentHashMap会优先选择扩容而非树化,具体的方法在tryPresize()中。整体流程和addCount()方法类似,这里不再赘述。

    后话

    如果读者够仔细的话,会发现在扩容这一段Doug Lea老爷子其实也留了些BUG下来。
    一个是在addCount中判断rs和sc关系的时候,一部分条件老爷子忘记了加位移操作。这部分代码如下:

    sc == rs + 1 || sc == rs + MAX_RESIZERS
    

    这一部分的等式均差了一个位移的运算。

    另一个是在tryPresize()方法中,while里的最后一个else if中 sc < 0的条件应该是永远不成立的,因为while的条件就是sc >=0。

    if (sc < 0) {
        Node<K,V>[] nt;
        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
        transferIndex <= 0)
        break;
        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
            transfer(tab, nt);
        }
    

    上面两部分代码,我在OPENJDK 16版本中确认过,确实已经修改过了。

    size()过程

    size()过程其实相对简单,上文在addCount()已经介绍过了,为了保证ConcurrentHashMap的吞吐量,元素个数被拆成了多个部分保存在countCells和baseCount中。那么求size()其实就是将这几部分数据累积。

        final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            //counterCells 不为空,说明此时有其他线程在更新数组
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    get过程

    相对于put过程,get()可以说十分简单了。

        public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //?在hash值得基础上再做一次散列,具体目的不明
            int h = spread(key.hashCode());
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //根据散列的值 得到tab中的元素,因为tabAt保证了可见性,因此可以认为多线程下数据没有问题
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                //哈希值小于0 说明节点正在迁移或是为树节点 为ForwardNode或是TreeBin 可以以多态的方式由不同实现根据不同的情况去查找
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //正常链表的查询
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    和HashMap的get过程基本一直(除了对hash值的扰动方式不一样)。
    整体流程就是计算键的哈希值属于哪个桶,然后查找该桶的所有元素,获取key相等的节点(链表直接遍历,红黑树用树的方式查找),并返回。

    与JDK7实现的简单对比

    文章的最后,我们看一下JDK8中的 ConcurentHashMap 与JDK7版本中的不同,也算是一个总结。
    其实,最大的差异就是JDK 8中不在使用Segment。因为其他所有的差异都是为了适应新的方式而做出的调整。
    譬如resize()时的不同(JDK7中只用对对应的Segment上锁,就可以用HashMap的方式进行resize())。
    又譬如二者在size()方法上的不同(JDK7中会先累加三次各个段的size(),如果其中数据发生了变化,说明此时有其他线程在操作,为了数据强一致性会上全锁(所有segment上锁)统计size)。
    虽然,JDK8中的ConcurrentHashMap实现上更为复杂, 但这样的好处也是显而易见的。那就是让ConcurrentHashMap的并发等级或者说吞吐量达到了最大话。
    更多JDK源码分析可以见我的GitHub项目:read-jdk
    如果文章有错误,也欢迎指正。

  • 相关阅读:
    HTML5 API分享
    承接VR外包,虚拟现实外包,北京正规公司
    虚拟现实外包—动点飞扬软件专门承接VR/AR场景、游戏、项目外包
    Unity3d外包—就找北京动点软件(长年承接Unity3d软件、游戏项目外包)
    Kinect外包团队— 2016中国VR开发者论坛第一期
    Kinect外包-就找北京动点飞扬软件(长年承接微软Kinect体感项目外包,有大型Kinect案例)
    Win10外包公司(长年承接Win10App外包、Win10通用应用外包)
    HTML5外包注意事项-开发HTML5游戏的九大坑与解决方法剖析
    HTML5外包团队:HTML5 Canvas使用教程
    libgo 2.0发布
  • 原文地址:https://www.cnblogs.com/insaneXs/p/13586928.html
Copyright © 2011-2022 走看看