zoukankan      html  css  js  c++  java
  • ConcurrentHashMap

    ConcurrentHashMap

              首先还是成员变量的认识 :

       

              与hashMap一致的.

                private static final int MAXIMUM_CAPACITY = 1 << 30;

        private static final int DEFAULT_CAPACITY = 16;

                private static final float LOAD_FACTOR =0.75f;

               static final int TREEIFY_THRESHOLD = 8;

               static final int UNTREEIFY_THRESHOLD = 6;

               static final int MIN_TREEIFY_CAPACITY = 64;

              

            比hashMap多的

             private static final int MIN_TRANSFER_STRIDE = 16;      一个线程最少处理16个位桶的处理.

             static final int MOVED = -1; // hash for forwarding nodes   // 头结点的hash值为-1, 表示当前table正在初始化
             static final int TREEBIN = -2; // hash for roots of trees         // 头结点的hash值为-2 , 表示当前位桶已经变为树结构
        static final int RESERVED = -3; // hash for transient reservations        //  位桶的头结点为-3 , 表示当前位桶正在转移
        static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

             private transient volatile int sizeCtl;   // 很关键的一个变量,伴随ConcurrentHashMap整个流程.

           

             static final int NCPU = Runtime.getRuntime().availableProcessors();     // CPU的数量

              静态内部类Node

              

               a.  与HashMap内不同的是, 这里 value 与 next 属性 都是 volatile 类型的. (多线程之前可见的)

     a . 这里table 和 nextTable也是 transient volatile 类型的 Node[] .

    1. 同样, ConcurrentHash也是查看它的put方法.

     

     a . 其中 onlyIfAbsent ,当为false时, 遇到相同的key值,value会进行覆盖.

     

     

     

     

     

     a . 还是一行行开始查看代码吧.

     b. 与HashMap不同的是, ConcurrentHashMap的 key 与 Value 都不允许为Null . 否则直接抛出 throw new NullPointerException()

     c .  求位桶的方式与hashMap相差不对. 也是高16位与低16位进行异或.

     d.  for (Node<K,V>[] tab = table;;) {   这样的一个死循环. 因为就条件而言,不存在一个跳出条件.  将成员变量table赋值给 tab .

     e . if (tab == null || (n = tab.length) == 0) , 需要进行扩容. 开始进入 tab = initTable();

     f . 开始查看 initTable() 方法 . 需要注意的是ConcurrentHashMap 是考虑并发的.

      private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            // 判断出这个while循环的条件是 , tab不为空.
            while ((tab = table) == null || tab.length == 0) {
                // sc < 0 , 也就是等于-1 的时候, 代表table初始化的这件事情已经有线程在做了,
                if ((sc = sizeCtl) < 0)
                     //让出CPU时间片
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                     // 采用的cas 乐观锁. int sc 默认值为 0 , 将其赋值为 -1
                    try {
                        // 再次判断, 当前tab数组的是否还是为null
                        if ((tab = table) == null || tab.length == 0) {
                            // 此时 sc应该是 -1 , 所以 n = DEFAULT_CAPACITY (默认数组长度16)
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            // 这里开始创建一个16位的Node数组.
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            // 两层赋值 , 这时候 table就是一个16位的数组.
                            table = tab = nt;
                            // 再来对sc赋值. 16 - (16 % 4) = 12 . 其实这时候sc就是扩容的临界值的
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // 把sc赋值给 sizeCtl , 这时候 sizeCtl就有了它的第二层含义,当前数组扩容的临界值
                        sizeCtl = sc;
                    }
                    // 初始化结束,跳出for循环的出口
                    break;
                }
            }
            // 将当前初始化的Node数组返回
            return tab;
        }

    a  大概初始化的流程就是如上代码注释, 是采用一个全局变量 , 初始值因为是 int  类型, 默认值为0 , 再一个线程抢到初始化成员变量table的任务后,会采用CAS 乐观锁,将其变为-1 . 控制其他线程就算进入也是让出CPU时间片. 而抢到初始化Node线程会在 new Node[]后,将sizeCtl 修改为 当前数组扩容的临界值.

    b . 初始化结束后,  将 new 的tab返回.因为这是一个死循环, 所以这个时候并退出不了这个for循环,下面的条件不满足,直接开始第二次循环.

    c . 这次循环进来之后. 进入else if 内了. i 又是同样的算法,将其赋值为   (n - 1) & h  , tabAt() 也是通过乐观锁来获取当前内容上的Node(链表的头结点) , if头结点为Null. 说明该位桶上不存在元素,  再次采用cas, 在当前位桶上的元素 由 null 更新为 new Node(); 此时的 break就可以跳出for的死循环了.如果失败,自旋重新替换.直到替换成功. 

    d . 跳出for循环for继续往下看.

     

     

     a . 因为是初始化并且第一次添加元素, 跳出for循环后,直接就可以 addCount()方法了.  delta =1.

     b. 扩容,设计者也考虑到了多线程, 采用了数组来分流.

    用到了 CounterCell 内部类.内部只有一个Volatile 的 Value .

     

     开始查看 addCount代码


    // 对ConcurrentHashMap进行计数的操作, ConcurrentHashMap.size()方法就是通过这里计数实现的
    // x 默认是 1 (每一次put都是放入一个元素) ,  int check 表示链表长度(不包含当前元素的情况下)
    private final void addCount(long x, int check) {
            // CounterCell[] as 用来计数的一个数组,通过内部CounterCell内部的value属性相加, 来求最终size()值
            CounterCell[] as; long b, s;
            // 如果当前 成员变量counterCells != null , 说明之前已经记过数了.
            //  || 采用Cas方法, 将当前baseCount + x 赋值给 baseCount ,如果赋值失败,说明存在并发,这里不再自旋处理了,而是直接放弃Cas乐观锁处理.
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                // 这里是放弃CAS乐观锁处理,继续往下看
                CounterCell a; long v; int m;
                // 初始化一个变量,并将这个变量设置为true
                boolean uncontended = true;
                // 因为上面进入这个if的条件是 不为空 或者 CAS失败
                // 所以这还是进行了一个非空判断
                // 这个括号内的条件比较有意思. 一步步往下走,但是每一步的赋值结果又可以作为下一步的判断结果
                // as == null 如果CounterCell[] == null . 还没有出现过分流用的数组
                // (m = as.length - 1) < 0  ,其实设计者可以写为  (m = as.length) ==0 , 但是 m这个变量,开发者想作为数组的索引使用, 相当于等式左右都 -1 .
                //  (a = as[ThreadLocalRandom.getProbe() & m]) 这里 CounterCell a 也进行了赋值. ThreadLocalRandom.getProbe() & m , 熟悉的代码, 又是获取数组内的一个随机位桶, 如果这个位桶上的CounterCell == null .
                // CounterCell a 这个变量已经在上一步赋值了, 能进行到这一步,说明上一步不为Null , 那么这就开始采用CAS 来对 CounterCell a 中的Value 属性进行递增 ,其实也是 +1 .
                // ThreadLocalRandom.getProbe()    是一个基于线程得到的随机数, 性能比Random高      
               if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                // 如果链表的个数 <=1 , 没比较考虑扩容了,直接return出去.这个元素计数结束.
                if (check <= 1)
                    return;
                    
                // 否则这里S还需要进行一下赋值, 就是所有位桶上CounterCell的Value值 + baseCount的值, 就是目前ConcurrentHashMap内所有元素的个数.
                s = sumCount();
            }
            // 接下来是扩容的事情. 先不看
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }

     开始查看 fullAddCount 代码

     

    // 当 baseCount 采用 CAS 乐观锁失败, CounterCell的Value值采用乐观锁更新失败. 反正就是当前元素计数SumCount失败了, 就该进行当前方法来计数了
    // X 是添加元素的个数 , 这个其实使用的时候,一般都是1 , boolean wasUncontended 是否存在冲突
    private final void fullAddCount(long x, boolean wasUncontended) {
            // 这里int类型h的定义, 已经接下来if中的赋值,都是想产生一个随机数.
            int h;
            if ((h = ThreadLocalRandom.getProbe()) == 0) {
                ThreadLocalRandom.localInit();      // force initialization
                h = ThreadLocalRandom.getProbe();
                wasUncontended = true;
            }
            boolean collide = false;                // True if last slot nonempty
            
            // 这里又是一个死循环,自旋
            for (;;) {
                // 同样定义了一堆变量,
                CounterCell[] as; CounterCell a; int n; long v;
                // 如果CounterCells 这个成员变量已经被初始化了,不等于0
                if ((as = counterCells) != null && (n = as.length) > 0) {
                    // 如果CounterCells[]数组的随机位桶上元素 ConterCell a == null . 表示虽然初始化了,但是可能随机到另外一个位桶上了 (当前考虑 数组的length =2 的情况)
                    if ((a = as[(n - 1) & h]) == null) {
                        if (cellsBusy == 0) {            // Try to attach new Cell
                            //new一个CounterCell , value也是直接复制为 1
                            CounterCell r = new CounterCell(x); // Optimistic create
                            // 也是根据cellsBusy, 将其从0替换为1 , 如果替换成功
                            if (cellsBusy == 0 &&
                                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                                // 是否创建完成 默认为false.
                                boolean created = false;
                                try {               // Recheck under lock
                                   // 下面就是将这个 CounterCell 放在刚才判断为null的位桶上了. (m - 1) & h 其实与上面的 (n - 1) & h 一样.
                                    CounterCell[] rs; int m, j;
                                    if ((rs = counterCells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                // 再次吧cellsBusy设置为0
                                    cellsBusy = 0;
                                }
                                //如果创建完成,跳出for循环
                                if (created)
                                    break;
                                continue;           // Slot is now non-empty
                            }
                        }
                        collide = false;
                    }
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
                    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                        break;
                    else if (counterCells != as || n >= NCPU)
                        collide = false;            // At max size or stale
                    else if (!collide)
                        collide = true;
                    else if (cellsBusy == 0 &&
                             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        try {
                            if (counterCells == as) {// Expand table unless stale
                                CounterCell[] rs = new CounterCell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                counterCells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    h = ThreadLocalRandom.advanceProbe(h);
                }
                // 这里开始就是 当前成员变量CounterCells还没有被初始化
                // 这里通过一个 int cellsBusy 成员变量, private transient volatile int cellsBusy;  如果还是0 ,代表还没有被初始化
                // 如果想要开始初始化, 也是进行一下CAS 乐观锁. 将 cellsBusy 由 0 更新为 1
                else if (cellsBusy == 0 && counterCells == as &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                         // cellsBusy已经占位成功, 成功更新为1了.
                         // 创建一个 boolean 类型的 init变量 , 默认初始化为false.
                    boolean init = false;
                    try {                           // Initialize table
                         // 开始初始化table, 这里默认是2位数组
                        if (counterCells == as) {
                            CounterCell[] rs = new CounterCell[2];
                            // rs[h & 1] = rs[h & rs.length-1] , 还是hashMap中获取一个随机位桶的写法, 并赋值为 new CounterCell(1) , 就是将value值直接置为1 .
                            rs[h & 1] = new CounterCell(x);
                            // 将初始化的 rs 赋值给成员变量 counterCells
                            counterCells = rs;
                            // 初始化的标志位设置为true . 初始化成功,这里并不会跳出自旋.
                            init = true;
                        }
                    } finally {
                        // 这里初始化成功,会重新把CellsBusy设置为 0
                        cellsBusy = 0;
                    }
                    // 如果初始化失败,跳出for循环
                    if (init)
                        break;
                }
                // 如果线程竞争激烈, 那么还是采用baseCount方法来cas增加元素个数.增加成功跳出for循环.
                else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                    break;                          // Fall back on using base
            }
        }

          

    人总得做点什么 ,不是么
  • 相关阅读:
    浏览器内置对象及其方法
    Leetcode | Path Sum I && II
    Leetcode | 3Sum
    算法分析之渐近符号
    Leetcode | Two Sum
    Leetcode | Wildcard Matching
    LeetCode | Regular Expression Matching
    Leetcode | Subsets I & II
    Leetcode | Populating Next Right Pointers in Each Node I & II
    爱是恒久忍耐,又有恩慈
  • 原文地址:https://www.cnblogs.com/liweibing/p/12869659.html
Copyright © 2011-2022 走看看