zoukankan      html  css  js  c++  java
  • ConcurrentHashMap源码分析

    1,使用及场景

        CHM是J.U.C提供的一个线程安全且高效的HashMap,api基本和Hashmap类似,主要有get,put等方法。

    2,源码分析

        2.1 JDK1.7和1.8的变化

            JDK1.7,简单来说chm是一个segment数组,它通过集成ReentrantLock进行加锁,通过每次锁住一个segment来保证每个segment内的操作是线程安全的。理论上可以同时支持16个线程的并发写入。因为有16个segment
            JDK 1.8 相对于1.7有两个方面的改动,
                1,取消了segment的设计,直接用Node数组来保存数据,对每个Node加锁来实现线程安全行。   
                2,将原来的数组+单向链表改为数据+单向链表+红黑树实现,为什么要用红黑树呢,因为会存在链表过长的节点,如果还采用单向列表的话,那么查询某个节点的时间复杂度为O(n),因此对于链表长度超过8的,jdk1.8采用了红黑树的结构将时间复杂度降为O(logN),提升查询性能。

        2.2  put方法 initTable

        如果当前table为空,则初始化一个长度为16的table,并且将扩容大小赋值给sizeCtl(0.75倍原数组大小)。
        sizeCtl:(1)负数代表正在初始化(-1)或者扩容(-N代表有N-1个线程正在进行扩容)操作,
                      (2)0表示Node数组还没有被初始化,
                      (3)正数表示下一次扩容的大小。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //n是2的n次方,所以减去1再和hash求与,结果还是hash,结果是取到位置为i的Node
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
        }

        上面 tabAt(tab, i = (n - 1) & hash)) 等价于tab[i],为什么不直接使用tab[i]呢,虽然数组本身是volatile修饰的,但是volatile的数组只针对它的引用,并不针对它里面的元素,所以它内部是用 getObjectVolatile 获取的。保证有其它线程对这个数组进行写操作,那么当前线程也一定可以读到最新的值。

        2.3 put方法 addCount

        putVal 执行完之后,会通过 addCount 来增加CHM中元素的个数,并且还会触发扩容操作。它里面运用了分而治制的思想
        如果在没有并发的情况下,chm采用 cas的方法修改 basecount 字段来统计集合的大小,否则使用CounterCell数组来计算。?为什么用这种方式呢,按照正常的想法为了保证安全性,肯定是用一个成员变量加锁来统计元素的个数,但是竞争比较激烈的情况下会影响性能。所以采用了分片的方法来记录大小。
        addCount(1L, binCount);     // 1L 表示这次需要在表中增加的元素个数,binCount是链表长度,如果大于等于0都需要进行扩容检查
    
        private transient volatile int cellsBusy;    // 标识当前cell数组是否在初始化或者扩容的cas标志
         */
        private transient volatile CounterCell[] counterCells;    //集合大小分别存在每个cell中
    
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }

            默认会初始化长度为2的CounterCell数组,然后随机得到指定的一个数组下标,将需要新增的值放到对应的下标处。

        2.4 transfer扩容阶段

        当 集合键值对总数 >= sizeCtl(扩容阀值)时,进行扩容操作。扩容的核心操作是数据的转移,最直观的方法是使用互斥锁,把转移过程锁住,但是持有锁的线程耗时越长,其它线程就一直被阻塞,从而导致吞吐量较低。
        resizeStamp(n); 会生成一个int值,高16位代表扩容标记,低16位代表并发扩容线程数,n不同生成的值也不会相同。保证每次扩容都生成唯一的戳。
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);    //
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }

        chm并没有直接加锁,而是采用cas实现无锁的并发同步策略,最精华的部分是利用多线程来协同扩容。

        简单的过程就是,我们把Node数组当做多个线程的共享队列,然后通过一个指针来划分每个线程负责的区间,每个线程通过逆向遍历实现扩容,已经遍历完的Node会被替换为一个 ForwardingNode 节点,表示当前Node已经被其它线程迁移完了。
        1,fwd节点:标识类,用于指向新表
        2,advance:提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
        3,finishing:用于提示扩展是否结束用的。
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        通过for自循环每个槽位中的链表元素,默认advance为真,通过CAS设置transferIndex属性值,并初始化 i 和bound的值,i 指当前处理的槽位号,bound指需要处理的槽位边界,先处理槽位31的节点,(boud, i) = (16,31) 从31的位置往前推动,如果15槽位处理完了,通过CAS插入初始化的fwd节点,用于告诉其他线程该槽位已经处理过了。
        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                return;
            finishing = advance = true;
            i = n; // recheck before commit
        }
        在扩容操作中,每执行完一个扩容操作,就通过cas将sc减1,直到sc-2等于原来的resizeStamp(n)代表扩容已经完成。

        2.5 数据迁移阶段的实现分析    

        当划分好迁移区间好后,针对每个节点的列表迁移,chm使用了高低位原理,将原来的链表的处于高位的元素放到高位节点下,低位的保持不变。
        因为n总是2的次幂,所以 fh&n的结果只有两种结果, 第n位为0或者1,然后将0的放到低位链表,为1的放到高位链表。
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                Node<K,V> ln, hn;
                if (fh >= 0) {
                    int runBit = fh & n;
                    Node<K,V> lastRun = f;
                    for (Node<K,V> p = f.next; p != null; p = p.next) {
                        int b = p.hash & n;
                        if (b != runBit) {
                            runBit = b;
                            lastRun = p;
                        }
                    }
                    if (runBit == 0) {
                        ln = lastRun;
                        hn = null;
                    }
                    else {
                        hn = lastRun;
                        ln = null;
                    }
                    for (Node<K,V> p = f; p != lastRun; p = p.next) {
                        int ph = p.hash; K pk = p.key; V pv = p.val;
                        if ((ph & n) == 0)
                            ln = new Node<K,V>(ph, pk, pv, ln);
                        else
                            hn = new Node<K,V>(ph, pk, pv, hn);
                    }
                    setTabAt(nextTab, i, ln);
                    setTabAt(nextTab, i + n, hn);
                    setTabAt(tab, i, fwd);
                    advance = true;
                }
                else if (f instanceof TreeBin) {
                    TreeBin<K,V> t = (TreeBin<K,V>)f;
                    TreeNode<K,V> lo = null, loTail = null;
                    TreeNode<K,V> hi = null, hiTail = null;
                    int lc = 0, hc = 0;
                    for (Node<K,V> e = t.first; e != null; e = e.next) {
                        int h = e.hash;
                        TreeNode<K,V> p = new TreeNode<K,V>
                            (h, e.key, e.val, null, null);
                        if ((h & n) == 0) {
                            if ((p.prev = loTail) == null)
                                lo = p;
                            else
                                loTail.next = p;
                            loTail = p;
                            ++lc;
                        }
                        else {
                            if ((p.prev = hiTail) == null)
                                hi = p;
                            else
                                hiTail.next = p;
                            hiTail = p;
                            ++hc;
                        }
                    }
                    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                    setTabAt(nextTab, i, ln);
                    setTabAt(nextTab, i + n, hn);
                    setTabAt(tab, i, fwd);
                    advance = true;
                }
            }
        }
     

        2.6 put方法 helpTransfer

        在putval的时候,如果当前节点是fwd节点,既hash为moved,意味着有其它线程正在扩容,那么当前线程直接帮助它进行扩容,调用helpTransfer方法。
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);

        2.7 put方法 添加节点到链表中

        如果添加节点的位置已经存在节点,需要以链表的方式加入到节点中,如果当前节点已经是一颗红黑树,那么久按照红黑树的规则将当前节点加入到红黑树中。  
        //锁住当前节点链表
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key,
                                                      value, null);
                            break;
                        }
                    }
                }
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                          value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
            }
        }

        2.8 put方法 判断链表是否需要转换为红黑树

        如果链表的长度 bincount 大于等于8,那么会根据当前链表的长度来决定是扩容还是转为红黑树,也就是说如果当前数组的长度小于64,那么优先扩容,否则转为红黑树。
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)//TREEIFY_THRESHOLD=8
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY=64
                    tryPresize(n << 1);
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
                                if ((p.prev = tl) == null)
                                    hd = p;
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
            }
        }



  • 相关阅读:
    爬虫工具简单整理
    vue单页面处理SEO问题
    深入浅出MyBatis-快速入门
    js的匿名函数 和普通函数
    Javascript位置 body之前、后执行顺序!
    eclipse中的ctrl+H使用中的问题
    Eclipse中ctrl+shift+r与ctrl+shift+t的区别
    Java 判断字符串是否为空的四种方法、优缺点与注意事项
    eclipse 快捷键
    只缩进新加一段代码的方法
  • 原文地址:https://www.cnblogs.com/gaojf/p/12836639.html
Copyright © 2011-2022 走看看