zoukankan      html  css  js  c++  java
  • ConcurrentHashMap的初步使用场景、源码分析讲解(中)

    紧接上篇

    CounterCells 初始化图解
    初始化长度为 2 的数组,然后随机得到指定的一个数组下标,将需要新增的值加入到对应下标位置处:

    transfer 扩容阶段

    判断是否需要扩容,也就是当更新后的键值对总数 baseCount >= 阈值 sizeCtl 时,进行
    rehash,这里面会有两个逻辑。
    1. 如果当前正在处于扩容阶段,则当前线程会加入并且协助扩容。
    2. 如果当前没有在扩容,则直接触发扩容操作。
    if(check >=0)
    
        {//如果 binCount>=0,标识需要检查扩容
            Node<K, V>[] tab, nt;
            int n, sc;
            //s 标识集合大小,如果集合大小大于或等于扩容阈值(默认值的 0.75)
            //并且 table 不为空并且 table 的长度小于最大容量
            while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);//这里是生成一个唯一的扩容戳,这个是干嘛用的呢?且听我慢慢分析
                if (sc < 0) {//sc<0,也就是 sizeCtl<0,说明已经有别的线程正在扩容了
                    //这 5 个条件只要有一个条件为 true,说明当前线程不能帮助进行此次的扩容,直接跳出循环
                    //sc >>> RESIZE_STAMP_SHIFT!=rs 表示比较高 RESIZE_STAMP_BITS 位 生成戳和 rs 是否相等,相同
                    //sc=rs+1 表示扩容结束
                    //sc==rs+MAX_RESIZERS 表示帮助线程线程已经达到最大值了
                    //nt=nextTable -> 表示扩容已经结束
                    //transferIndex<=0 表示所有的 transfer 任务都被领取完了,没有剩余的hash 桶来给自己自己好这个线程来做 transfer
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//当前线程尝试帮助此次扩容,如果成功,则调用 transfer
                        transfer(tab, nt);
                }
                // 如果当前没有在扩容,那么 rs 肯定是一个正数,通过 rs<<RESIZE_STAMP_SHIFT 将 sc 设置为一个负数,+2 表示有一个线程在执行扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs <<
                        RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();// 重新计数,判断是否需要开启下一轮扩容
            }
        }
    
    resizeStamp
    这块逻辑要理解起来,也有一点复杂。resizeStamp 用来生成一个和扩容有关的扩容戳,具体有什么作用呢?我们基于它的实现来做一个分析
    static final int resizeStamp(int n) {
         return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }    
    
    Integer.numberOfLeadingZeros 这个方法是返回无符号整数 n 最高位非 0 位前面的 0 的个数,比如 10 的二进制是 0000 0000 0000 0000 0000 0000 0000 1010,那么这个方法返回的值就是 28,根据 resizeStamp 的运算逻辑,我们来推演一下,假如 n=16,那么 resizeStamp(16)=32796,转化为二进制是[0000 0000 0000 0000 1000 0000 0001 1100],接着再来看,当第一个线程尝试进行扩容的时候,会执行下面这段代码U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2),rs 左移 16 位,相当于原本的二进制低位变成了高位 1000 0000 0001 1100 0000 0000 0000 0000,然后再+2 =1000 0000 0001 1100 0000 0000 0000 0000+10=1000 0000 0001 1100 0000 0000 0000 0010 ,高 16 位代表扩容的标记、低 16 位代表并行扩容的线程数。
    高 RESIZE_STAMP_BITS 位  低 RESIZE_STAMP_SHIFT 位
    扩容标记      
    并行扩容线程数
    ➢ 这样来存储有什么好处呢?
    1. 首先在 CHM 中是支持并发扩容的,也就是说如果当前的数组需要进行扩容操作,可以由多个线程来共同负责,这块后续会单独讲。
    2. 可以保证每次扩容都生成唯一的生成戳,每次新的扩容,都有一个不同的 n,这个生成戳就是根据 n 来计算出来的一个数字,n 不同,这个数字也不同。
    ➢ 第一个线程尝试扩容的时候,为什么是+2
    因为 1 表示初始化,2 表示一个线程在执行扩容,而且对 sizeCtl 的操作都是基于位运算的,所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而 sc + 1 会在低 16 位上加 1。 
    transfer
    扩容是 ConcurrentHashMap 的精华之一,扩容操作的核心在于数据的转移,在单线程环境下数据的转移很简单,无非就是把旧数组中的数据迁移到新的数组。但是这在多线程环境下,在扩容的时候其他线程也可能正在添加元素,这时又触发了扩容怎么办?可能大家想到的第一个解决方案是加互斥锁,把转移过程锁住,虽然是可行的解决方案,但是会带来较大的性能开销。因为互斥锁会导致所有访问临界区的线程陷入到阻塞状态,持有锁的线程耗时越长,其他竞争线程就会一直被阻塞,导致吞吐量较低。而且还可能导致死锁。而 ConcurrentHashMap 并没有直接加锁,而是采用 CAS 实现无锁的并发同步策略,最精华的部分是它可以利用多线程来进行协同扩容简单来说,它把 Node 数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划
    分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的bucket 会被替换为一个 ForwardingNode 节点,标记当前 bucket 已经被其他线程迁移完了。接下来分析一下它的源码实现:
    1、fwd:这个类是个标识类,用于指向新表用的,其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。
    2、advance:这个变量是用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
    3、finishing:这个变量用于提示扩容是否结束用的。
    private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
            int n = tab.length, stride;
            //将 (n>>>3 相当于 n/8) 然后除以 CPU 核心数。如果得到的结果小于 16,那么就使用 16
            // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为 16 的时候,扩容的时候只会有一个线程来扩容
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            //nextTab 未初始化,nextTab 是用来扩容的 node 数组
            if (nextTab == null) { // initiating
                try {
                    @SuppressWarnings("unchecked") //新建一个 n<<1 原始 table 大小的 nextTab,也就是 32
                            Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                    nextTab = nt;//赋值给 nextTab
                } catch (Throwable ex) { // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE; //扩容失败,sizeCtl 使用 int 的最大值
                    return;
                }
                nextTable = nextTab; //更新成员变量
                transferIndex = n;//更新转移下标,表示转移时的下标
            }
            int nextn = nextTab.length;//新的 tab 的长度
            // 创建一个 fwd 节点,表示一个正在被迁移的 Node,并且它的 hash 值为-1(MOVED),也就是前面我们在讲 putval 方法的时候,会有一个判断 MOVED 的逻辑。它的作用是用来占位,表示原数组中位置 i 处的节点完成迁移以后,就会在 i 位置设置一个 fwd 来告诉其他线程这个位置已处理过了,具体后续还会在讲
            ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
            // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
            boolean advance = true;
            //判断是否已经扩容完成,完成就 return,退出循环
            boolean finishing = false; // to ensure sweep before committing nextTab通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置transferIndex 属性值,并初始化 i 和 bound 值,i 指当前处理的槽位序号,bound 指需要处理的槽位边界,先处理槽位 15 的节点;
            for (int i = 0, bound = 0; ; ) {
                // 这个循环使用 CAS 不断尝试为当前线程分配任务
                // 直到分配成功或任务队列已经被全部分配完毕
                // 如果当前线程已经被分配过 bucket 区域
                // 那么会通过--i 指向下一个待处理 bucket 然后退出该循环
                Node<K, V> f;
                int fh;
                while (advance) {
                    int nextIndex, nextBound;
                    //--i 表示下一个待处理的 bucket,如果它>=bound,表示当前线程已经分配过ucket 区域
                    if (--i >= bound || finishing)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {//表示所有 bucket 已经被分配完毕
                                i = -1;
                        advance = false;
                    }
                    //通过 cas 来修改 TRANSFERINDEX,为当前线程分配任务,处理的节点区间为(nextBound, nextIndex) -> (0, 15)
                    else if (U.compareAndSwapInt
                            (this, TRANSFERINDEX, nextIndex,
                                    nextBound = (nextIndex > stride ?
                                            nextIndex - stride : 0))) {
                        bound = nextBound;//0
                        i = nextIndex - 1;//15
                        advance = false;
                    }
                }//i<0 说明已经遍历完旧的数组,也就是当前线程已经处理完所有负责的 bucket
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {//如果完成了扩容
                        nextTable = null;//删除成员变量
                        table = nextTab;//更新 table 数组
                        sizeCtl = (n << 1) - (n >>> 1);//更新阈值(32*0.75=24)
                        return;
                    }
                    // sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
                    // 然后,每增加一个线程参与迁移就会将 sizeCtl 加 1,
                    // 这里使用 CAS 操作对 sizeCtl 的低 16 位进行减 1,代表做完了属于自己的任务
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        /*第一个扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                        后续帮其扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1
                        每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
                        那么最后一个线程退出时:必然有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT*/
                        // 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在  帮助他们扩容了。也就是说,扩容结束了。
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        // 如果相等,扩容结束了,更新 finising 变量
                        finishing = advance = true;
                        // 再次循环检查一下整张表
                        i = n; // recheck before commit
                    }
                }
                // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
                else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);
                //表示该位置已经完成了迁移,也就是如果线程 A 已经处理过这个节点,那么线程 B 处理这个节点时,hash 值一定为 MOVED
                else if ((fh = f.hash) == MOVED) advance = true; // already processed
            }
        }
    
    扩容过程图解
    ConcurrentHashMap 支持并发扩容,实现方式是,把 Node 数组进行拆分,让每个线程处理自己的区域,假设 table 数组总长度是 64,默认情况下,那么每个线程可以分到 16 个 bucket。然后每个线程处理的范围,按照倒序来做迁移通过 for 自循环处理每个槽位中的链元素,默认 advace 为真,通过 CAS 设置 transferIndex属性值,并初始化 i 和 bound 值,i 指当前处理的槽位序号,bound 指需要处理的槽位边界,先处理槽位 31 的节点; (bound,i) =(16,31) 从 31 的位置往前推动。

    假设这个时候 ThreadA 在进行 transfer,那么逻辑图表示如下

    在当前假设条件下,槽位 15 中没有节点,则通过 CAS 插入在第二步中初始化的ForwardingNode 节点,用于告诉其它线程该槽位已经处理过了。
    sizeCtl 扩容退出机制
    在扩容操作 transfer 的第 2414 行,代码如下if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)每存在一个线程执行完扩容操作,就通过 cas 执行 sc-1。接着判断(sc-2) !=resizeStamp(n) << RESIZE_STAMP_SHIFT ; 如果相等,表示当前为整个扩容操作的 最后一个线程,那么意味着整个扩容操作就结束了;如果不想等,说明还得继续这么做的目的,一方面是防止不同扩容之间出现相同的 sizeCtl,另外一方面,还可以避免sizeCtl 的 ABA 问题导致的扩容重叠的情况。
    紧接下篇~篇幅过长 
  • 相关阅读:
    Leetcode Unique Binary Search Trees
    Leetcode Decode Ways
    Leetcode Range Sum Query 2D
    Leetcode Range Sum Query
    Leetcode Swap Nodes in Pairs
    Leetcode Rotate Image
    Leetcode Game of Life
    Leetcode Set Matrix Zeroes
    Leetcode Linked List Cycle II
    CF1321A
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13085212.html
Copyright © 2011-2022 走看看