zoukankan      html  css  js  c++  java
  • jdk8 ConcurrentHashMap源码分析

    前面分析了jdk8版本的HashMap,虽然在新版本的jdk中解决了HashMap之前存在的死锁问题,但是该版本的HashMap还是只能用于单线程情况下。多线程环境下还是要使用ConcurrentHashMap来解决并发问题。

    ConcurrentHashMap的继承关系如下所示:

    ConcurrentHashMap的示意图如下所示:

    成员变量

        //最大容量
        private static final int MAXIMUM_CAPACITY = 1 << 30;
        //默认容量
        private static final int DEFAULT_CAPACITY = 16;
        // 负载因子
        private static final float LOAD_FACTOR = 0.75f;
    
        //最大并发度 最多允许多少个线程进行访问
        private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
        //树化阈值 
        static final int TREEIFY_THRESHOLD = 8;
        //最小树化容量
        static final int MIN_TREEIFY_CAPACITY = 64;
        //反树化阈值
        static final int UNTREEIFY_THRESHOLD = 6;
    
        //实际存放数组的node数组 使用volatile修饰 多线程下保持可见性
        transient volatile Node<K,V>[] table;
        //扩容时用于存放数组的数组  平时为null
        private transient volatile Node<K,V>[] nextTable;
          //通过CAS更新,记录容器的容量大小
        private transient volatile long baseCount;
        //控制标志符 下面会详细讲到
        private transient volatile int sizeCtl;
        //下次transfer方法的起始下标index加上1之后的值
        private transient volatile int transferIndex;
        //CAS自旋锁标志位
        private transient volatile int cellsBusy;
        //counter cell表,长度总为2的幂次
        private transient volatile CounterCell[] counterCells;
    

    下面重点说一下sizeCtl这个属性,这是一个在多线程间共享的竞态资源,用于维护各种状态,保存各类信息。

    • sizeCtl>0:
      • 当前数组尚未初始化时,表示初始容量;
      • 若是已经初始化后,则表示扩容后的阈值,此时sizeCtl=table.length*0.75f
    • sizeCtl=-1:表示当前数组正在初始化或者是扩容阶段;
    • sizeCtl<-1:承担扩容时,标识符(高16位)和参与线程数目(低16位)的存储。需要扩容时,使用CAS设置为sizeCtl+1,完成扩容时,使用CAS设置为sizeCtl-1

    内部类

    Node

     static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;
    
            Node(int hash, K key, V val, Node<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.val = val;
                this.next = next;
            }
    
            public final K getKey()       { return key; }
            public final V getValue()     { return val; }
            public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
            public final String toString(){ return key + "=" + val; }
            public final V setValue(V value) {
                throw new UnsupportedOperationException();
            }
    
            public final boolean equals(Object o) {
                Object k, v, u; Map.Entry<?,?> e;
                return ((o instanceof Map.Entry) &&
                        (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                        (v = e.getValue()) != null &&
                        (k == key || k.equals(key)) &&
                        (v == (u = val) || v.equals(u)));
            }
    
            /**
             * Virtualized support for map.get(); overridden in subclasses.
             */
            Node<K,V> find(int h, Object k) {
                Node<K,V> e = this;
                if (k != null) {
                    do {
                        K ek;
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                    } while ((e = e.next) != null);
                }
                return null;
            }
        }
    

    Node这个内部类是真正用于存储keyvalue的数据载体,其中keyvalue都是使用volatile进行修饰的,保证了多线程情况下的可见性。

    ForwardingNode

     static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
    
            Node<K,V> find(int h, Object k) {
                // loop to avoid arbitrarily deep recursion on forwarding nodes
                outer: for (Node<K,V>[] tab = nextTable;;) {
                    Node<K,V> e; int n;
                    if (k == null || tab == null || (n = tab.length) == 0 ||
                        (e = tabAt(tab, (n - 1) & h)) == null)
                        return null;
                    for (;;) {
                        int eh; K ek;
                        if ((eh = e.hash) == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        if (eh < 0) {
                            if (e instanceof ForwardingNode) {
                                tab = ((ForwardingNode<K,V>)e).nextTable;
                                continue outer;
                            }
                            else
                                return e.find(h, k);
                        }
                        if ((e = e.next) == null)
                            return null;
                    }
                }
            }
        }
    

    ForwardingNode继承自Node,这个类只会在扩容时用到。当数据进行扩容时,会将数组索引位置处的Node节点转换为ForwardingNode节点,这样在putget以及remove操作时,可以知道此时数组正在扩容,相应的操作就会进行等待状态,直到扩容完成后继续执行接下来的操作。

    构造方法

    //无参构造
    public ConcurrentHashMap() {
        }
    
    //带有初始容量的构造方法
     public ConcurrentHashMap(int initialCapacity) {
            if (initialCapacity < 0)
                throw new IllegalArgumentException();
            int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                       MAXIMUM_CAPACITY :
                       tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
            this.sizeCtl = cap;
        }
    
    //带有初始容量 负载因子 并发度等的构造方法
     public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
        }
    
      //使用集合作为入参的构造方法
      public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
            this.sizeCtl = DEFAULT_CAPACITY;
            putAll(m);
        }
    

    tableSizeFor

     private static final int tableSizeFor(int c) {
            int n = c - 1;
            n |= n >>> 1;
            n |= n >>> 2;
            n |= n >>> 4;
            n |= n >>> 8;
            n |= n >>> 16;
            return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
        }
    

    HashMap中的tableSizeFor方法一样,返回第一个大于等于入参c且是2的倍数的数。

    put方法

    public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
            //这里可以看出 ConcurrentHashMap中不允许 key 或者 value为null的情况
            if (key == null || value == null) throw new NullPointerException();
            //类似于 HashMap中的hash方法,计算hash
            int hash = spread(key.hashCode());
            int binCount = 0;
            //死循环自旋
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //如果当前数组还未初始化
                if (tab == null || (n = tab.length) == 0)
                    //初始化数组
                    tab = initTable();
                //当前索引位置没有值,直接创建
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //使用cas在当前索引位置处创建新的Node节点,当i位置为空时,cas成功,退出自旋,否则继续
                    //自旋
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                //static final int MOVED     = -1;表示正在迁移
                //如果当前节点是转移节点,表示该节点正在经历扩容,就会陷入等待,直到扩容完成
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                //当前索引位置处已经存在一个旧节点了
                else {
                    V oldVal = null;
                    //使用synchronized加锁
                    synchronized (f) {
                        //这里再次判断 i 索引位置的数据没有被修改
                        //binCount 被赋值的话,说明走到了修改表的过程里面
                        if (tabAt(tab, i) == f) {
                           
                            if (fh >= 0) {
                                //修改binCount
                                binCount = 1;
                                //开始遍历链表
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //根据hash和key进行判断,命中了,结束循环
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    //把新增的元素赋值到链表最后,退出自旋
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            //如果是红黑树的话
                            //红黑树,这里没有使用 TreeNode,使用的是 TreeBin,TreeNode 只是红黑树的一
                            //个节点
                            //TreeBin 持有红黑树的引用,并且会对其加锁,保证其操作的线程安全
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                //满足if的话,把老的值给oldVal
                                //在putTreeVal方法里面,在给红黑树重新着色旋转的时候
                                //会锁住红黑树的根节点
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                     //binCount不为空,并且 oldVal 有值的情况,说明已经新增成功了
                    if (binCount != 0) {
                        //判断是否需要将链表转为红黑树
                        //static final int TREEIFY_THRESHOLD = 8;
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        //这一步几乎走不到。槽点已经上锁,只有在红黑树或者链表新增失败的时候
                        //才会走到这里,这两者新增都是自旋的,几乎不会失败
                        break;
                    }
                }
            }
           //check 容器是否需要扩容,如果需要去扩容,调用 transfer 方法去扩容
           //如果已经在扩容中了,check有无完成
            addCount(1L, binCount);
            return null;
        }
    
    

    put方法的流程梳理如下:

    • key或者value是否为null,有一个为null,直接抛出异常;
    • 计算hash值;
    • 开始自旋:
      • 当前数组是否初始化了,没有则调用initTable进行初始化;
      • 当前索引位置是否为null,是的话,使用cas将节点设置到该索引位置处;
      • 否的话,判断当前节点是否处于转移阶段,若是,则等待扩容;
      • 前面两个都不满足的话,说明当前索引位置已经存在节点,且不是扩容阶段,则使用synchronized锁住当前索引位置节点:
        • 判断当前索引位置的节点是否已经被修改了;
        • 如果当前节点是一个链表的话,则开始遍历链表进行查找,如果key能匹配到,则直接替换value,否则在链表尾部新增节点,退出自旋;
        • 如果是红黑树结构,则调用红黑树的新增方法来设置keyvalue,退出自旋;
    • 新增完成后,判断是否需要将链表进行树化;
    • 更新
    • 返回;

    整个流程的示意图如下所示:

    put方法如何保证线程安全

    put方法中做了四处优化,保证线程安全:

    • 通过自旋死循环保证一定可以新增成功。在新增之前,通过 for (Node<K,V>[] tab = table;;) 这样的死循环来保证新增一定可以成功,一旦新增成功,就可以退出当前死循环,新增失败的话,会重复新增的步骤,直到新增成功为止;
    • 当索引位置处为空时,通过CAS来进行新增节点。这里的逻辑非常严谨,没有判断在槽点为空的情况下直接赋值,因为在判断槽点为空和赋值的瞬间,很有可能槽点已经被其他线程赋值了,所以这里采用了CAS算法,能够保证槽点为空的情况下赋值成功;如果槽点恰好已经被其他线程进行了赋值,当前的CAS操作会失败,会继续自旋,然后再走槽有值的put流程,这里就是CAS+自旋的结合;
    • 当前槽点有值时,锁住当前槽点put时,如果当前槽点有值,则表明出现了hash冲突,此时槽点上有可能是链表或者红黑树,我们通过锁住槽点,来保证同一时刻只会有一个线程能对槽点进行修改;
    • 红黑树旋转时,锁住红黑树的根节点,保证同一时刻,当前红黑树只能被一个线程旋转;

    initTable方法

    懒加载思想,当需要使用到的时候才去初始化数组,而不是一开始就初始化好。

    //初始化 table,通过对 sizeCtl 的变量赋值来保证数组只能被初始化一次
    private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
        //开始自旋
            while ((tab = table) == null || tab.length == 0) {
                 // 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
                if ((sc = sizeCtl) < 0)
                    Thread.yield();
                 // CAS 赋值保证当前只有一个线程在初始化,-1 代表当前只有一个线程能初始化
                // 保证了数组的初始化的安全性
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        //很有可能执行到这里的时候,table 已经不为空了,这里是双重 check
                        if ((tab = table) == null || tab.length == 0) {
                            //默认容量
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            //新建数组
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            //更新sc的值 0.75n
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        //更新 sizeCtl 0.75*table.length
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    spread方法

      static final int HASH_BITS = 0x7fffffff;
    
    static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS;
        }
    
    

    该方法用于计算hash值,使h的高位和低位都可以参与到运算中,减少hash碰撞,HASH_BITS主要是将负的hash值转为正值。

    tabAt方法

      static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
        }
    

    通过CAS方法获取数组索引位置上的值。

    casTabAt方法

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                            Node<K,V> c, Node<K,V> v) {
            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
        }
    

    通过CAS设置数组中指定索引位置处的值。

    get方法

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //计算hash
            int h = spread(key.hashCode());
           //当前数组的索引位置处不为空
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //如果 key 和 hash都一样 表明命中
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                // 该位置处是红黑树的话 调用红黑树的查找方法
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //链表的话 就直接遍历链表
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    get方法的逻辑很简单,而且相比于put方法来说,没有自旋和加锁过程,效率很高。

    扩容

    当数组中元素数量超过阈值时,ConcurrentHashMap就会发生扩容,如put方法中最后通过addCount方法进行扩容判断。

    addCount方法

    /**
         * @param x 要增加的元素个数
         * @param check 当小于0时,不检查是否进行扩容,
         * 当小于等于1时,只在非竞争状态下检查是否需要扩容
         */
    private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            // 1. counterCells不为空
          	// 2. CAS修改baseCount属性失败时
            if ((as = counterCells) != null ||
                // CAS设置baseCOunt
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                // 线程争用的状态标记
                boolean uncontended = true;
                  // 1. 计数cell为null,或长度小于1
                // 2. 随机去一个数组位置为为空
                // 3. CAS替换CounterCell的value失败
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    // CAS增加CounterCell的value值失败会调用fullAddCount方法
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                //计算此时数组中元素的数量
                s = sumCount();
            }
        // 根据`check >= 0`判断是否需要检查扩容
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                // 1. 如果元素总数大于sizeCtl,表示达到了扩容阈值
                // 2. tab数组不能为空,已经初始化
                // 3. table.length小于最大容,有扩容空间
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    // 根据数组长度获取一个扩容标志
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        // 如果sc的低16位不等于rs,表示标识符已经改变.			
                        // 如果nextTable为空,表示扩容已经结束
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                          // CAS替换sc值为sc+1,成功则开始扩容
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    // `sc > 0`表示数组此时并不在扩容阶段,更新sizeCtl并开始扩容
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        // 调用transfer,nextTable待生成
                        transfer(tab, null);
                         //计算此时数组中元素的数量
                    s = sumCount();
                }
            }
        }
    

    helpTransfer方法

     /**
      * 参数:
      * tab -> 扩容的数组,一般为table
      * f -> 线程持有的锁对应的桶的头节点
      * 调用地方:
      * 1. `putVal`检测到头节点Hash为MOVED
      */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            // 1.当前数组不能为空 
    		// 2.参数f必须为ForwardingNode类型
            // 3.f.nextTab不能为空
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                // 获取扩容的标识
                int rs = resizeStamp(tab.length);
                 //再次判断扩容正在进行
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                    //如果扩容已经完成,返回
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                         //cas更新扩容线程数,将自己加进去
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
    

    transfer方法

    ConcurrentHashMap的核心扩容方法,源码如下所示:

     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            // 老数组的长度
            int n = tab.length, stride;
            // stride为此次需要迁移的桶的数目
          	// NCPU为当前主机CPU数目
          	// MIN_TRANSFER_STRIDE为每个线程最小处理的组数目
          	// 1. 在多核中stride为当前容量的1/8对CPU数目取整,例如容量为16时,CPU为2时结果是1
          	// 2. 在单核中stride为n就为当前数组容量
     		// !!! stride最小为16,被限定死.
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
           //nextTab为null表示扩容还未初始化
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                       // 如果新数组为空,初始化,大小为原数组的两倍,n << 1
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    //超过最大长度限制了,不能在继续扩容了
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                //待移动的数量为n
                transferIndex = n;
            }
            //新数组长度
            int nextn = nextTab.length;
            //创建转移节点 如果原数组上是转移节点,说明该节点正在被扩容
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
           //遍历本线程分担的扩容范围的标记,如果为false说明当前分到的范围完成了,继续去获取任务,或者全部结
           //束了
            boolean advance = true;
         //扩容完成的标记,用来在扩容全部完成之后再检查一遍所有节点
            boolean finishing = false;
             // 无限自旋,i 的值会从原数组的最大值开始,慢慢递减到 0
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                //继续进行当前范围的扩容任务
                while (advance) {
                    int nextIndex, nextBound;
                    // 扩容下标小于等于边界,或者已经标记为完成,不再继续循环
                    if (--i >= bound || finishing)
                        advance = false;
                    //(nextIndex = transferIndex) <= 0说明整体扩容已经完成了,将i=-1,用于下面判断结
                    //束
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    //当前范围扩容还在进行,cas更新nextIndex,标志本线程已经领取了此范围的扩容任务,调整
                    //当前的边界
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                         // 每次减少 i 的值
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                //如果i小于0了说明在上面的循环逻辑中transferIndex<=0,本线程的扩容任务已经完成,并且也没
                //有未分配的扩容任务了
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    //标记为完成
                    if (finishing) {
                       // 拷贝结束,直接赋值,因为每次拷贝完一个节点,都在原数组上放转移节点,所以拷贝完成
                        //的节点的数据一定不会再发生变化。
                // 原数组发现是转移节点,是不会操作的,会一直等待转移节点消失之后在进行操作。
                // 也就是说数组节点一旦被标记为转移节点,是不会再发生任何变动的,所以不会有任何线程安全的问
                //题
                // 所以此处直接赋值,没有任何问题。
                        nextTable = null;
                        table = nextTab;
                        //扩容阀值变为当前的1.5倍
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                     //如果标记为未完成,说明还有线程没有完成自己的分配任务,尝试将sizeCtl-1,标记当前线程
                    //已经完成。
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                          //下面的判断表达式与addCount中的逻辑一致,如果为true说明所有线程都已经完成了任
                        //务,如果不等于说明还有未完成扩容的线程,本线程的扩容逻辑返回,等待其他线程完成。
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        //上面表达式成立,所有线程都已经完成了任务,将finishing标记改为true,重新遍历一
                        //遍所有节点,确保所有节点都已经完成了扩容
                        finishing = advance = true;
                        //这里将i赋值为n,上面的while逻辑每次循环都会执行--i,所以i的值就是从n到0,然后执
                        //行下面的节点检查的逻辑
                        i = n;
                    }
                }
              //本线程当前的扩容任务未完成
               // 1.当前节点为null,cas更新节点的值为fwd转移节点,转移节点的hash值为MOVED
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                //2.判断是否已经在转移了(用于全部完成后的节点再次检查)
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                     //3.节点还没有转移到nextTab,对其加锁执行转移操作
                    //转移节点有一个重新计算下标的操作,由于nextTable的长度为tab的两倍,所以转移过去之后
                    //通过hash值定位的下标有可能会改变。
                    //低位仍保持与旧数组一样的index,高位是原先的index+oldTab.length
                    synchronized (f) {
                         //检查加锁完成后,当前位置的值是否变化了,如果变化了说明被其他线程修改了,回到上面
                        //的逻辑继续循环重试
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                //遍历当前节点的链表,记录最后一个下标变化的节点
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                 //反向遍历链表,这里的判断条件p != lastRun
                                //主要是为了尽量避免创建不必要的新Node对象,链表中有可以直接使用的链直接
                                //整体迁移至新表中,也就是上面首先正向遍历计算lastRun的目的。
                                //有上面的分析我们知道这里不是整个链表反向的,有可能会复用原来的链
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        //下标没有变化,节点放在低位
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        //下标变化了,节点放在高位
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                 //cas更新低位节点链表
                                setTabAt(nextTab, i, ln);
                                 //cas更新高位节点链表
                                setTabAt(nextTab, i + n, hn);
                                 // 在老数组位置上放上 ForwardingNode 节点
                               // put 时,发现是 ForwardingNode 节点,就不会再动这个节点的数据了
                                setTabAt(tab, i, fwd);
                                  //继续当前扩容范围的下一个节点
                                advance = true;
                            }
                             //红黑树节点的迁移
                            else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    //处理低位
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                        //处理高位
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                //检查迁移过后的数量存储格式是否需要变为链表
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                //设置树的根节点
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                //更新低位
                                setTabAt(nextTab, i, ln);
                                 //更新高位
                                setTabAt(nextTab, i + n, hn);
                             // 在老数组位置上放上 ForwardingNode 节点
                                setTabAt(tab, i, fwd);
                                //继续当前扩容范围的下一个节点
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    

    整个扩容过程梳理如下:

    • 通过CPU和当前数组的数量计算每个CPU要帮忙处理多少个桶,并且每个线程处理都是平均的,默认每个线程处理16个桶。因此,如果此时数组长度为16的话,只需要一个线程进行扩容即可;
    • 初始化临时数组nextTable,长度为当前数组长度的2倍;
    • 开始进行自旋转移,这里根据一个布尔值finishing来判断扩容是否完成;
      • 进入一个while循环中,分配数组中一个桶区间给线程,默认是16。从大到小开始进行分配,拿到分配值后,i进行递减i表示数组下标。这里还有一个bound参数,表示该线程此次可以进行处理的区间的最小下标,超过这个下标,需要重新领取区间或者是结束扩容advance参数表示是否继续递减对下一个桶进行转移,为true则继续向后推移,否则说明当前桶还没有处理好,不能继续推进;
      • 跳出while循环,进入if判断中,判断此时扩容是否结束,如果结束则清空临时变量,将nextTable的引用赋值给tab,更新扩容阈值sizeCtl旧数组长度的1.5倍,扩容结束;如果尚未完成,且无法领取新的区间,将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,所有的线程扩容结束了,然后检查所有的桶,防止遗漏
      • 如果没有完成任务,且i对应的槽位是空,尝试 CAS 插入forwardingNode,这样put方法在插入时知道这个节点为forwardingNode节点,表示正在扩容,插入过程暂停,直到扩容完成;
      • 如果 i 对应的槽位不为空,且该位置节点为forwardingNode,那么该线程跳过这个槽位,处理下一个槽位;
      • 如果以上都是不是,说明这个槽位有一个实际的值,对当前该位置节点进行加锁处理;
      • 此时都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了;
    • 接上面,对当前节点加锁之后开始进行处理,防止put此时插入值;
    • 如果当前桶上是链表结构,将该链表根据新增位是0还是1分成两个链表,然后进行迁移;
    • 如果当前桶上红黑数结构,那么也拆成 2 份,方式和链表的方式一样,然后,判断拆分过的树的节点数量,如果数量小于等于 6,改造成链表。反之,继续使用红黑树结构;
    • 到这里扩容结束,完成了一个桶从旧表转移到新表的过程;

    扩容时保证线程安全的关键点

    • 进行槽点时,会把原数组的槽点锁住;
    • 拷贝成功之后,会把原数组的槽点设置为转移节点,这样在put的时候发现该节点是转移槽点就会陷入等待,直到扩容成功之后才能继续put
    • 从尾到头开始拷贝拷贝成功之后就把原数组的槽点设置为转移节点
    • 等扩容拷贝结束之后,之前等待put的数据才能继续put

    扩容时,通过在原数组上设置转移节点,put 时碰到转移节点时会等待扩容成功之后才能 put 的策略,来保证了整个扩容过程中肯定是线程安全的,因为数组的槽点一旦被设置成转移节点,在没有扩容完成之前,是无法进行操作的。

    remove方法

        final V replaceNode(Object key, V value, Object cv) {
            //计算hash值
            int hash = spread(key.hashCode());
            //自旋
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //定位到索引位置处 如果此处为null 直接跳出循环
                if (tab == null || (n = tab.length) == 0 ||
                    (f = tabAt(tab, i = (n - 1) & hash)) == null)
                    break;
                //索引位置处不为空 但是是扩容状态
                else if ((fh = f.hash) == MOVED)
                    //则调用helpTransfer帮助扩容
                    tab = helpTransfer(tab, f);
                else {
                    //索引位置处为链表或者是红黑树
                    V oldVal = null;
                    boolean validated = false;
                    //对当前位置的节点进行加锁
                    synchronized (f) {
                        //再次判断该处位置的节点是否被改变
                        if (tabAt(tab, i) == f) {
                            //如果是链表的话,则遍历链表
                            if (fh >= 0) {
                                validated = true;
                                //遍历链表
                                for (Node<K,V> e = f, pred = null;;) {
                                    K ek;
                                    //定位到待删除节点
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        V ev = e.val;
                                        if (cv == null || cv == ev ||
                                            (ev != null && cv.equals(ev))) {
                                            oldVal = ev;
                                            if (value != null)
                                                e.val = value;
                                            //删除该节点
                                            else if (pred != null)
                                                pred.next = e.next;
                                            else
                                                //否则使用CAS进行设置
                                                setTabAt(tab, i, e.next);
                                        }
                                        break;
                                    }
                                    pred = e;
                                    if ((e = e.next) == null)
                                        break;
                                }
                            }
                            //如果是红黑树的话 则调用红黑树的方法进行删除
                            else if (f instanceof TreeBin) {
                                validated = true;
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> r, p;
                                if ((r = t.root) != null &&
                                    (p = r.findTreeNode(hash, key, null)) != null) {
                                    V pv = p.val;
                                    if (cv == null || cv == pv ||
                                        (pv != null && cv.equals(pv))) {
                                        oldVal = pv;
                                        if (value != null)
                                            p.val = value;
                                        else if (t.removeTreeNode(p))
                                            //使用CAS进行设置
                                            setTabAt(tab, i, untreeify(t.first));
                                    }
                                }
                            }
                        }
                    }
                    if (validated) {
                        if (oldVal != null) {
                            if (value == null)
                                //更新size
                                addCount(-1L, -1);
                            //返回待删除值
                            return oldVal;
                        }
                        break;
                    }
                }
            }
            return null;
        }
    

    删除的逻辑跟get差不多,都是先定位到索引位置处,若是单节点,则直接进行删除;若是链表或者红黑树调用相关方法进行设置。

    size 和 mappingCount

    HashMap获取size时只需要直接返回Node节点的数量即可,但是ConcurrentHashMap在多线程环境下使用时,不能简单计算Node节点的数量了。其size方法源码如下:

     public int size() {
            long n = sumCount();
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
        }
    
    final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
       // size就是 baseCount + counterCells数组非空元素值之和
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    
      @sun.misc.Contended static final class CounterCell {
            volatile long value;
            CounterCell(long x) { value = x; }
        }
    
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.FIELD, ElementType.TYPE})
    public @interface Contended {
        String value() default "";
    }
    

    我们会发现元素的总数相当于baseCount + counterCells数组非空元素值之和。这是因为在没有并发的情况下,使用volatile修饰的baseCount即可代表着元素总数。但是在并发情况下,使用CAS修改baseCount失败时,就会新建一个CounterCell类来进行计数,value一般为1。因此在addCount方法中,我们发现本来应加在baseCount上的值,有一部分加在了counterCells数组中,也就是说元素总数应该为baseCountcouterCells数组中非空元素值之和。

    size方法中,返回值最大为Integer.MAX_VALUE,但是ConcurrentHashMapsize有可能大于Integer.MAX_VALUE,因而jdk的开发人员建议使用mappingCount这个方法,其源码如下:

    public long mappingCount() {
            long n = sumCount();
            return (n < 0L) ? 0L : n;
        }
    

    可以看出,mappingCount底层还是调用了sumCount这个方法,但是这个方法返回值为long类型。

    总结

    jdk8中的ConcurrentHashMap的使用了volatile+sychronized+cas机制来保证高并发下数据的安全,这其中使用到了大量的自旋来保证相应操作的成功,所以在多线程竞争激烈的情况下可能发生线程阻塞的情况。

    参考

    https://juejin.im/post/6844904122429210632#heading-10

    https://juejin.im/post/6844903857659576328#heading-5

    https://juejin.im/post/6844903869714006029#heading-12

    https://juejin.im/post/6844903944800436238#heading-6

    https://juejin.im/post/6844903763103186958#heading-8

    https://juejin.im/post/6844904069711003655#heading-5

    https://juejin.im/post/6844903607901356046#heading-3

  • 相关阅读:
    Mysql创建nextval函数
    宝塔配置tomcat的配置
    小程序获取授权信息
    pycharm 2017新建文件添加编码方式等
    Linux下利用expect,不用交互模式,直接登陆远程主机
    linux文件权限解析(摘)
    linux环境下根据文件的某一列进行去重
    oracle查询用户权限及角色(摘)
    插入排序-python实现
    css清除浮动方法
  • 原文地址:https://www.cnblogs.com/reecelin/p/13461194.html
Copyright © 2011-2022 走看看