zoukankan      html  css  js  c++  java
  • ConcurrentHashMap源码分析

    1,使用及场景

        CHM是J.U.C提供的一个线程安全且高效的HashMap,api基本和Hashmap类似,主要有get,put等方法。

    2,源码分析

        2.1 JDK1.7和1.8的变化

            JDK1.7,简单来说chm是一个segment数组,它通过集成ReentrantLock进行加锁,通过每次锁住一个segment来保证每个segment内的操作是线程安全的。理论上可以同时支持16个线程的并发写入。因为有16个segment
            JDK 1.8 相对于1.7有两个方面的改动,
                1,取消了segment的设计,直接用Node数组来保存数据,对每个Node加锁来实现线程安全行。   
                2,将原来的数组+单向链表改为数据+单向链表+红黑树实现,为什么要用红黑树呢,因为会存在链表过长的节点,如果还采用单向列表的话,那么查询某个节点的时间复杂度为O(n),因此对于链表长度超过8的,jdk1.8采用了红黑树的结构将时间复杂度降为O(logN),提升查询性能。

        2.2  put方法 initTable

        如果当前table为空,则初始化一个长度为16的table,并且将扩容大小赋值给sizeCtl(0.75倍原数组大小)。
        sizeCtl:(1)负数代表正在初始化(-1)或者扩容(-N代表有N-1个线程正在进行扩容)操作,
                      (2)0表示Node数组还没有被初始化,
                      (3)正数表示下一次扩容的大小。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //n是2的n次方,所以减去1再和hash求与,结果还是hash,结果是取到位置为i的Node
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
        }

        上面 tabAt(tab, i = (n - 1) & hash)) 等价于tab[i],为什么不直接使用tab[i]呢,虽然数组本身是volatile修饰的,但是volatile的数组只针对它的引用,并不针对它里面的元素,所以它内部是用 getObjectVolatile 获取的。保证有其它线程对这个数组进行写操作,那么当前线程也一定可以读到最新的值。

        2.3 put方法 addCount

        putVal 执行完之后,会通过 addCount 来增加CHM中元素的个数,并且还会触发扩容操作。它里面运用了分而治制的思想
        如果在没有并发的情况下,chm采用 cas的方法修改 basecount 字段来统计集合的大小,否则使用CounterCell数组来计算。?为什么用这种方式呢,按照正常的想法为了保证安全性,肯定是用一个成员变量加锁来统计元素的个数,但是竞争比较激烈的情况下会影响性能。所以采用了分片的方法来记录大小。
        addCount(1L, binCount);     // 1L 表示这次需要在表中增加的元素个数,binCount是链表长度,如果大于等于0都需要进行扩容检查
    
        private transient volatile int cellsBusy;    // 标识当前cell数组是否在初始化或者扩容的cas标志
         */
        private transient volatile CounterCell[] counterCells;    //集合大小分别存在每个cell中
    
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }

            默认会初始化长度为2的CounterCell数组,然后随机得到指定的一个数组下标,将需要新增的值放到对应的下标处。

        2.4 transfer扩容阶段

        当 集合键值对总数 >= sizeCtl(扩容阀值)时,进行扩容操作。扩容的核心操作是数据的转移,最直观的方法是使用互斥锁,把转移过程锁住,但是持有锁的线程耗时越长,其它线程就一直被阻塞,从而导致吞吐量较低。
        resizeStamp(n); 会生成一个int值,高16位代表扩容标记,低16位代表并发扩容线程数,n不同生成的值也不会相同。保证每次扩容都生成唯一的戳。
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);    //
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }

        chm并没有直接加锁,而是采用cas实现无锁的并发同步策略,最精华的部分是利用多线程来协同扩容。

        简单的过程就是,我们把Node数组当做多个线程的共享队列,然后通过一个指针来划分每个线程负责的区间,每个线程通过逆向遍历实现扩容,已经遍历完的Node会被替换为一个 ForwardingNode 节点,表示当前Node已经被其它线程迁移完了。
        1,fwd节点:标识类,用于指向新表
        2,advance:提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
        3,finishing:用于提示扩展是否结束用的。
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        通过for自循环每个槽位中的链表元素,默认advance为真,通过CAS设置transferIndex属性值,并初始化 i 和bound的值,i 指当前处理的槽位号,bound指需要处理的槽位边界,先处理槽位31的节点,(boud, i) = (16,31) 从31的位置往前推动,如果15槽位处理完了,通过CAS插入初始化的fwd节点,用于告诉其他线程该槽位已经处理过了。
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit
        }
        在扩容操作中,每执行完一个扩容操作,就通过cas将sc减1,直到sc-2等于原来的resizeStamp(n)代表扩容已经完成。

        2.5 数据迁移阶段的实现分析    

        当划分好迁移区间好后,针对每个节点的列表迁移,chm使用了高低位原理,将原来的链表的处于高位的元素放到高位节点下,低位的保持不变。
        因为n总是2的次幂,所以 fh&n的结果只有两种结果, 第n位为0或者1,然后将0的放到低位链表,为1的放到高位链表。
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                Node<K,V> ln, hn;
                if (fh >= 0) {
                    int runBit = fh & n;
                    Node<K,V> lastRun = f;
                    for (Node<K,V> p = f.next; p != null; p = p.next) {
                        int b = p.hash & n;
                        if (b != runBit) {
                            runBit = b;
                            lastRun = p;
                        }
                    }
                    if (runBit == 0) {
                        ln = lastRun;
                        hn = null;
                    }
                    else {
                        hn = lastRun;
                        ln = null;
                    }
                    for (Node<K,V> p = f; p != lastRun; p = p.next) {
                        int ph = p.hash; K pk = p.key; V pv = p.val;
                        if ((ph & n) == 0)
                            ln = new Node<K,V>(ph, pk, pv, ln);
                        else
                            hn = new Node<K,V>(ph, pk, pv, hn);
                    }
                    setTabAt(nextTab, i, ln);
                    setTabAt(nextTab, i + n, hn);
                    setTabAt(tab, i, fwd);
                    advance = true;
                }
                else if (f instanceof TreeBin) {
                    TreeBin<K,V> t = (TreeBin<K,V>)f;
                    TreeNode<K,V> lo = null, loTail = null;
                    TreeNode<K,V> hi = null, hiTail = null;
                    int lc = 0, hc = 0;
                    for (Node<K,V> e = t.first; e != null; e = e.next) {
                        int h = e.hash;
                        TreeNode<K,V> p = new TreeNode<K,V>
                            (h, e.key, e.val, null, null);
                        if ((h & n) == 0) {
                            if ((p.prev = loTail) == null)
                                lo = p;
                            else
                                loTail.next = p;
                            loTail = p;
                            ++lc;
                        }
                        else {
                            if ((p.prev = hiTail) == null)
                                hi = p;
                            else
                                hiTail.next = p;
                            hiTail = p;
                            ++hc;
                        }
                    }
                    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                    setTabAt(nextTab, i, ln);
                    setTabAt(nextTab, i + n, hn);
                    setTabAt(tab, i, fwd);
                    advance = true;
                }
            }
        }
     

        2.6 put方法 helpTransfer

        在putval的时候,如果当前节点是fwd节点,既hash为moved,意味着有其它线程正在扩容,那么当前线程直接帮助它进行扩容,调用helpTransfer方法。
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);

        2.7 put方法 添加节点到链表中

        如果添加节点的位置已经存在节点,需要以链表的方式加入到节点中,如果当前节点已经是一颗红黑树,那么久按照红黑树的规则将当前节点加入到红黑树中。  
        //锁住当前节点链表
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key,
                                                      value, null);
                            break;
                        }
                    }
                }
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                          value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
            }
        }

        2.8 put方法 判断链表是否需要转换为红黑树

        如果链表的长度 bincount 大于等于8,那么会根据当前链表的长度来决定是扩容还是转为红黑树,也就是说如果当前数组的长度小于64,那么优先扩容,否则转为红黑树。
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)//TREEIFY_THRESHOLD=8
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY=64
                    tryPresize(n << 1);
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
                                if ((p.prev = tl) == null)
                                    hd = p;
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
            }
        }



  • 相关阅读:
    CCF CSP 题解
    CCF CSP 2019032 二十四点
    CCF CSP 2018121 小明上学
    CCF CSP 2019092 小明种苹果(续)
    CCF CSP 2019091 小明种苹果
    CCF CSP 2019121 报数
    CCF CSP 2019031 小中大
    CCF CSP 2020061 线性分类器
    CCF CSP 2020062 稀疏向量
    利用国家气象局的webservice查询天气预报(转载)
  • 原文地址:https://www.cnblogs.com/gaojf/p/12836639.html
Copyright © 2011-2022 走看看