zoukankan      html  css  js  c++  java
  • ConcurrentHashMap扩容分析

    CHM的扩容方法是 transfer ,发现调用该方法的地方一共有三处

      tryPresize    在treeifyBin被调用,判断该数组节点相关元素是不是已经超过8个的时候,此时要判断桶的数量是不是超过了64个,如果还没超过就先扩容。如果超过则把链表转为树

      helpTransfer     一个线程要对table中元素进行操作的时候(可以是put也可以是remove),如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法

      addCount  对CHM进行了增加元素的操作,使得数组中存储的元素个数发生了变化的时候会调用的方法

      这里还有一点特殊的,那就是helpTransfer 是有返回值的,而其他两个方法返回值是void。这也好理解,毕竟helpTransfer就表示已经开始扩容了,那么nextTable就必然不是null了。

      开始分析transfer

      第一部分 准备阶段

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range 不用纠结 就当它是16好了
            if (nextTab == null) {            // initiating 如果是第一个线程那么nextTab就是null
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;//transferIndex很关键 是老表的表长度比如16
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//迁移后的桶位置上就放上ForwardingNode 注意下整个桶里方法的fwd都是同一个
            boolean advance = true;
            boolean finishing = false; // to ensure sweep before committing nextTab

      第二部分开始转移

    for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {           第一部分 决定扩容的上限和下限 甚至可能是下一次的扩容
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                if (i < 0 || i >= n || i + n >= nextn) { 第二部分决定是否退出
                    int sc;
                    if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                else if ((f = tabAt(tab, i)) == null)//第三部分 开始做真正的转移
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                转移普通节点
                            }
                            else if (f instanceof TreeBin) {
                                转移树节点
                            }
                        }
                    }
                }
            }
        }

    首先大的脉络这是一个自旋的过程,而出口只有两个地方,我们最后分析这两处。

    if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;

      先简化流程,就假设初始容量是16这样就只会有一个线程来扩容

    一 计算转移上限下限

            while (advance) {           第一部分 决定扩容的上限和下限 甚至可能是下一次的扩容
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing)//i是不断自减得
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {//transferIndex在上面被赋值,是源桶数组的长度,如果transferIndex <= 0是整个桶都处理完了
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {stride是16,nextIndex也是16,所以这里nextBound是0
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
    i是
    transferIndex - 1 ,正好是最高的下标
    到这里我们知道了i是15 而bound是0,这就是一个转移段有16个桶

      然后就是对每个桶进行转移了,也就是第三部分,其实这部分的逻辑反倒是最简单的

     二 桶的转移

           else if ((f = tabAt(tab, i)) == null)//如果是null,直接通过cas把fwd放上去,fwd里面就有nextTable的引用
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)//这是已经处理过了 因为fwd节点的hash值就是-1
                    advance = true; // already processed
                else {
                    synchronized (f) {//加锁
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {//还记得特殊节点的特殊值嘛 forward是-1 treenode是-2 所以大于0表示这是个普通的节点
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                //高位链,低位链和HashMap逻辑是一样的 只是加了点小技巧
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);//注意一个细节 新的table的里的Node都是new出来的,原桶根本没有变,也就是说在cas前老桶还是能够提供查询功能的
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {//红黑树的过程和普通节点没啥区别,因为红黑树里面也是有链表结构的,它也是按照高位链低位链划分的
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }

    三 最重要的部分 退出

    先看一种情况,就是处理完了16个桶之后会怎么样 

          while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing) //处理完本次的16个桶那么肯定这里 --i >=bound就是false了
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {//如果下标还不是0 就是还有要转移的
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {//这里还是会再次计算i和bound 再次处理这次的桶数组
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }

      也就是说一个线程进入transfer,不论是他是第一个开始扩容的线程,还是后来进入帮忙的,不处理完是不能退出的,而不是处理完16个就算完了。

      那么如果

    nextIndex = transferIndex) <= 0

      那就说明此时真的数据转移完了,那么就进入退出判断流程,这部分也是最难得

          if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }

    上一个代码段里说了,如果已经转移完了,那么 i = -1,满足了if条件

      finishing只有一个地方被赋值  finishing = advance = true; 所以第一次进来是不可能直接走

             if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }

      所以重点要分析的就是,通过CAS对 sizeCtl 这很好理解,扩容的线程要-1。如果经过-1后这个值不是

      (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 说明什么呢?说明它不是最后一个线程。

      这里其实挺有意思的,还记得扩容开始的 sizeCtl 怎么算的嘛? 就是扩容标识戳,然后左移16位,同时低16位赋值为2,所以啊 就是判断 当前线程-1之后等不等于这个值

      如果不等于这个值,那么他不是最后一个线程,那么你可以走了。

      而最后一个线程不能走,我们就来看看不能走的逻辑吧

            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }

      最后一个线程会执行最后两句,i又被赋值为原桶的size

      代码自旋又走到while循环里

      然后又是 - -i,再把每个桶检查一遍

      都检查完了,还记得上面把finishing设置为true了吧

      

              if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }

    最后的线程收尾工作

      1 nextTable 为null

      2 table指向新的nextTable

      3 重新计算sizeCtl

      到此整个transfer分析就完了

    四 收尾

      我们最开始说了三个方法会调transfer

      其实这三个方法都差不太多

      这里面还有一个比较好玩的地方

      拿出来分析分析

           int rs = resizeStamp(tab.length);
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {//sizeCtl为负数才说明在扩容
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nextTab);
                        break;
                    }
                }

      sc >>> RESIZE_STAMP_SHIFT) != rs 其实我要说的就是这个,rs就是扩容标识戳,每个线程要进入扩容之前都要检查当前的扩容标识戳对不对。如果不对那说明上次的扩容结束了,这次是新的扩容。

      这里再次强调下 sizeCtl 高16位是扩容标识戳,低16位是扩容线程数 + 1

      

      

  • 相关阅读:
    [K/3Cloud] 关于单据转换的问题
    [K/3Cloud] 分录行复制和新增行的冲突如何处理
    [K/3Cloud] 动态表单打开时传递一个自定义参数并在插件中获取
    [译]C++书籍终极推荐
    Time.deltaTime 含义和应用
    cocos2d-x 锚点,位置==》动手实验记录 多动手... :)
    iOS的主要框架介绍
    xcode 预编译头文件
    android 内部存储相关知识点: getfilestreampath getDir 子文件夹
    coco2d-js 多屏适配相关API
  • 原文地址:https://www.cnblogs.com/juniorMa/p/13907580.html
Copyright © 2011-2022 走看看