zoukankan      html  css  js  c++  java
  • 并发和多线程(十九)ConcurrentHashMap源码解析(jdk1.8) Diamond


    在并发工具类和并发集合中,可能最复杂的就是jdk1.8+的ConcurrentHashMap源码,相比jdk1.8-,已经抛弃了Segment分段锁的实现方式,采用了Synchronized、CAS、volatile来实现一个线程安全的容器,是HashMap线程安全版本。关于数据结构方面,和HashMap很相似了,但是有更多内部类,我们在下面就可以看到。

    如果想要了解HashMap相关源码,可以参考:HashMap源码解析(jdk1.8)

    关于CAS,volatile和Synchronized的相关机制默认都是了解的,所以不会有过多解释,其次是很多不重要或者和HashMap相同的代码就会一笔带过,不然篇幅太过巨大,如果对HashMap实现思想不太了解的,可以先看一下上面的链接。

    成员变量

        transient volatile Node<K,V>[] table;
    
        //下一个Node数组,当扩容的时候使用
        private transient volatile Node<K,V>[] nextTable;
    
        //基础计数器值,通过CAS更新
        private transient volatile long baseCount;
    
        //表初始化和扩容的时候用到,-1表示正在初始化,-(N+1)表示N个活跃线程正在进行扩容
        private transient volatile int sizeCtl;
    
        //扩容的时候拆分nextTable的索引+1
        private transient volatile int transferIndex;
    
    

    构造函数

        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            //不符合条件的参数,直接抛出异常                     
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            //如果初始容量小于并发级别,赋值    
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
        }
    

    concurrencyLevel表示并发级别,默认为1。

    Node

    代码就不贴出来了,太占篇幅,和HashMap是有区别的,Node不支持setValue的,支持find(),也就是map.get()。

    TreeNode

    TreeNode表示红黑树的结构,是其数据结构实现,保存在数组的红黑树并不是TreeNode,而是下面的TreeBin

    TreeBin

    TreeNode实现红黑树的数据结构,TreeBin持有其引用,对红黑树进行操作的时候,通过lockRoot()和unlockRoot()对root进行加锁和解锁。真正在Node数组上的红黑树就是TreeBin。

    ForwardingNode

    ForwardingNode表示是扩容的时候的node的head,是个空节点,只是告诉后面的线程当前节点数据已经迁移完成。

    putVal 添加

        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            //根据key的hashCode()获取hash值
            int hash = spread(key.hashCode());
            int binCount = 0; //为当前node的元素数,判断扩容或者树形化
            //无限循环,等待条件break
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //如果当前数组为空,直接进行初始化,然后继续循环
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                //如果hash得到当前数组对应位置的值为null,直接cas生成一个node,然后break
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                //如果当前正在进行扩容
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {  //如果当前位置有值
                    V oldVal = null;
                    //synchronized锁住当前数组node
                    synchronized (f) {
                        //判断当前位置的值是否发生变化,因为在此期间可能被并发修改
                        if (tabAt(tab, i) == f) {
                            //fh >= 0表示当前非红黑树节点
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //如果key为一般节点,将节点值暂存oldVal,然后将Value赋值,break
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    //当前为链表,next等于null,直接生成node,挂到尾部,break
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            //如果是红黑树
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                //如果红黑树相应节点不为null,然后将Value赋值,将节点值暂存oldVal
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    //binCount != 0表示上面循环赋值成功
                    if (binCount != 0) {
                        //表示可能需要进行树形化,进行treeifyBin逻辑,染回oldVa
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    整体代码逻辑和HashMap很相似,只是多了一些保证并发安全的逻辑,下面对有些点解释一下。

    1. ConcurrentHashMap的key或者value都不能为空,否则抛出异常。
    2. tabAt()是通过volatile从主存中取值,保证并发环境下获取value都是新的。
    3. casTabAt()通过CAS新增节点到数组,同时外层自旋保证肯定成功,当然可能被其他线程put之后,就不会走到这里了。
    4. fh >= 0为啥表示非红黑树节点,因为红黑树的root对应hash值是固定的,static final int TREEBIN = -2;
    5. 扩容在treeifyBin()内部的,这点也是和HashMap的区别,而且逻辑判断相对更为复杂,后者直接执行resize(),不需要考虑并发。

    总结一下putVal()整体思路:

    1. onlyIfAbsent默认为false,表示是否替换已经存在的元素。
    2. 判断key或者value是否为空,否则抛出异常。
    3. 判断当前数组是否为空,如果是通过initTable()进行初始化。
    4. 通过key的hash值得到数组所在下标位置,判断当前位置是否为空,如果是,直接通过CAS添加。
    5. 判断当前Node的hash值是否为MOVED,如果是,说明有其他线程正在扩容,调用helpTransfer()帮助扩容,通过多线程同时转移节点,减少扩容带来的性能消耗。
    6. 如果hash值不是MOVED,synchronized直接锁住当前节点。hash值 >= 0,说明不是红黑树,遍历当前链表节点,如果找到key,直接替换,否则添加到尾部。
    7. 如果是红黑树,TreeBin就是红黑树的head节点,通过putTreeVal()添加到红黑树。
    8. 添加完成,判断是否达到TREEIFY_THRESHOLD,如果是,通过treeifyBin();判断进行判断扩容或者树形化。

    initTable 初始化

        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                //sizeCtl < 0表示当前有其他线程正在进行初始化或者扩容,直接执行yield()让出CPU使用权
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                //CAS对SIZECTL进行赋值
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        //双重检测
                        if ((tab = table) == null || tab.length == 0) {
                            //sc > 0表示通过构造函数初始化的时候,已经设置过初始容量的,否则生成一个大小为16的数组
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            //sc = 0.75n
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    通过initTable()进行初始化,而HashMap通过resize()进行初始化或者扩容。

    1. 如果sizeCtl < 0,表示当前有其他线程正在进行初始化或者扩容,直接执行yield()让出CPU使用权。
    2. 如果sizeCtl > 0,表示设置初始容量为sizeCtl。
    3. 否则设置初始容量为16。
    4. sizeCtl为数组长度的的3/4

    treeifyBin

        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
            //如果没有达到树形化的另一个条件,也就是数组长度小于64,直接进行扩容
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                    tryPresize(n << 1);
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
                                if ((p.prev = tl) == null)
                                    hd = p;
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
            }
        }
    
    1. 这个方法总体逻辑就是判断是否达到树形化的第二个条件,如果没有,进行扩容,数组长度为2倍。
    2. 如果是,生成一个TreeNode,然后包装到TreeBin中,set一下。

    tryPresize()

        private final void tryPresize(int size) {
            //判断size是否>=MAXIMUM_CAPACITY的一半,如果是c=MAXIMUM_CAPACITY,否则
            int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
            int sc;
            while ((sc = sizeCtl) >= 0) {
                Node<K,V>[] tab = table; int n;
                if (tab == null || (n = tab.length) == 0) {
                    n = (sc > c) ? sc : c;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                        try {
                            if (table == tab) {
                                @SuppressWarnings("unchecked")
                                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                                table = nt;
                                sc = n - (n >>> 2);
                            }
                        } finally {
                            sizeCtl = sc;
                        }
                    }
                }
                else if (c <= sc || n >= MAXIMUM_CAPACITY)
                    break;
                else if (tab == table) {
                    int rs = resizeStamp(n);
                    //注意这部分逻辑,在transfer()扩容的时候需要用到
                    if (sc < 0) {
                        Node<K,V>[] nt;
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                }
            }
        }
    

    transfer()

    transfer()是从helpTransfer内部调用的,helpTransfer的注释是帮助扩容,如果有其他线程正在进行扩容的话,扩容也是ConcurrentHashMap最难的部分,lz尽量说的清晰一点。

        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            //n >>> 3相当于n/8,然后除以CPU核心数,判断是否小于16,这样为了让每个CPU转移的数组长度一样多,一般来说,默认一个线程处理16的长度
            //所以一般情况下,我们代码中也只是需要1-2个线程完成数据迁移。
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            //nextTab未初始化,进行初始化
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                //更新成员变量
                nextTable = nextTab;
                //转移时的下标
                transferIndex = n;
            }
            int nextn = nextTab.length;
            //ForwardingNode表示一个正在迁移的node,当原数组中下标为i的节点完成迁移,就会设置一个fwd,表示当前位置的数据被其他线程处理了,然后就可以跳过了
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            //表示当前下标对应节点是否全部迁移,如果true,--i
            boolean advance = true;
            //判断是否已经完成扩容。
            boolean finishing = false; // to ensure sweep before committing nextTab
            //i表示处理的,从nextIndex逐渐-1,为了方便理解,下面举个栗子,length从16扩容到32的场景下,第一个线程,bucket负责的范围就是[0,16]
            for (int i = 0, bound = 0;;) {
                //
                Node<K,V> f; int fh;
                while (advance) {
                    //nextIndex表示当前bucket最右边边界,nextIndex=16,nextBound=0
                    int nextIndex, nextBound;
                    //表示当前bucket对应数组所有下标的节点都已经转移
                    if (--i >= bound || finishing)
                        advance = false;
                    //表示所有的bucket都分配完毕
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    //CAS对TRANSFERINDEX赋值,首次执行代码,这里bound为0,i为15
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                //当前线程已经处理完成自己所负责的bucket
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    
                    //如果完成了扩容,能够退出循环
                    if (finishing) {
                        nextTable = null;   //删除成员变量
                        table = nextTab;    //更新table数组
                        sizeCtl = (n << 1) - (n >>> 1); //更新阈值(32*0.75=24)
                        return;
                    }
                    //每个帮助扩容的线程,对SIZECTL进行-1操作
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        //①这里的判断的原理在后面讲
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                //如果当前位置的node为null,直接将fwd放到当前node,表示数据已经被转移
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                //表示当前node数据正在被转移,跳过当前循环次
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    //锁住当前节点
                    synchronized (f) {
                        //双重判断
                        if (tabAt(tab, i) == f) {
                            //ln和hn分别表示lownode,highnode,ln在原位置不动,hn移动到index+length的位置
                            Node<K,V> ln, hn;
                            //如果不是红黑树
                            if (fh >= 0) {
                                //runBit的结果只能是0和n
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                //lastRun表示最后一个hash & n发生变化的节点,后续节点hash & n的结果都是相同的,位置也是相同的,所以直接可以带过去
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                //等于0,放到ln,否则放到hn
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                //如果当前节点为单个数据,直接拷贝,否则循环链表,进行链表拷贝
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                //这里就是上面说的,ln放到index位置,hn放到index+length的位置,原数组的index保存fwd
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {
                                //***
                            }
                        }
                    }
                }
            }
        }
    

    上面对扩容方法加了部分注释,但是由于方法本身比较难懂,一般来说不强求都要看懂,掌握思想是最重要的。

    1. 首先是,根据CPU核数判断每个线程负责的bucket区间。
    2. 如果nextTab为空,进行初始化,这里是因为transfer()有多处调用,例如通过putAll()进行调用的话,可能就没有初始化,所有有了这部分逻辑。
    3. 对每个线程进行分工迁移数据,正如上面的举例,thread1负责的bucket[0,15],thread2负责的bucket[16,31],都是从每个bucket的最右边开始迁移数据,比如thread从15开始,然后是14,13...0,然后满足判断return。
    4. 代码有个我注释①的位置,(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT这个判断的原因是,第一个进行扩容的线程进入transfer()之前,通过CAS赋值rs << RESIZE_STAMP_SHIFT) + 2,而后面的方法每次都对SIZECTL+1,这部分代码在tryPresize()。然后进入transfer()之后,每个方法分到bucket,迁移数据,都会对SIZECTL-1,当最后一个线程迁移数据完成之后,就会满足上面的判断。
    5. 然后就是数据迁移的部分,就是区分链表和红黑树,红黑树的部分跳过了,思想都是一样的,扩容前后对hash冲突的元素如果存储,和HashMap是相同的,这部分内容没啥好讲得,大家应该都了解了,而且注释也说了思想,如果不了解的,可以查看最开始那个HashMap链接,或者百度相关内容。
    6. 需要注意的一点是,和链表hash & n结果相同和不同的节点,会分别组成新的链表,但是前者链表还是原来的顺序,但是后者和之前相比,会出现倒置,这点,网上有大佬说的,但是我没看太懂,有大神可以底下留言,解释一下。

    到这里,我们了解了新增、初始化、扩容的源码,已经把ConcurrentHashMap最难的部分解决了,后面查询相对很简单了。

    get获取

        public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            int h = spread(key.hashCode());
            //数组不为空,下标对应node不为空
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //判断hash值,然后判断key,如果满足,返回val
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //如果非单个节点,遍历判断返回
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    从上面看到get()没有任何同步措施,所以ConcurrentHashMap是支持并发读的。

  • 相关阅读:
    PYTHON lambda表达式
    PYTHON 写函数,检查传入字典的每一个value的长度,如果大于2,那么仅保留前两个长度的内容,并将新内容返回给调用者
    PYTHON 写函数,检查获取传入列表或元组对象的所有奇数位索引对应的元素,并将其作为新列表返回给调用者
    PYTHON 写函数,检查传入列表的长度,如果大于2,那么仅保留前两个长度的内容,并将新内容返回给调用者
    PYTHON 写函数,检查用户传入的对象(字符串、列表、元组)的每一个元素是否含有空内容。
    PYTHON isinstance语法
    PYTHON 写函数,计算传入字符串中【数字、字母、空格、以及其他的个数】
    杂题之 一行式子求网页页数
    带头节点的单链表的插入操作优化
    找出唯一出现一次的数
  • 原文地址:https://www.cnblogs.com/huigelaile/p/15780388.html
Copyright © 2011-2022 走看看