zoukankan      html  css  js  c++  java
  • LongAdder源码分析

      之前一直没有重视LongAdder这个类,直到开始看ConcurrentHashMap源码后才发现在ConcurrentHashMap中计算map的size使用的正是LongAdder。

    只不过ConcurrentHashMap并没有直接引用LongAdder,而是把LongAdder中的代码都拷贝了过去,至于为啥这么搞就不去研究了。

    LongAdder存储结构和原理

       LongAdder的内部有一个base变量,一个Cell[]数组。
      base变量:非竞态条件下,直接累加到该变量上
      Cell[]数组:竞态条件下,线程通过哈希运算找到它自己的槽位,累加个各个线程自己的槽Cell[i]中 

    @sun.misc.Contended static final class Cell {
            volatile long value;

      Cell只有一个value,多个线程通过cas对value进行累加。

      LongAdder的原理可以简单理解如下,如果竞争不激烈就往base里写,如果激烈则每个线程往Cell数组的某一个元素写。Cell的最大容量不超过cpu数量。这么设计的好处不难理解,高并发下

    如果多个线程都对一个共享变量写入,每次都只能有一个线程CAS成功,将写入动作分散到cpu核数那个Cell后,竞争压力大大减小。

      LongAdder只有一个空构造器,所有复杂的逻辑都在它的父类Striped64中。包括base和cell都在

    abstract class Striped64 extends Number {
       transient volatile Cell[] cells;
       transient volatile long base;
       transient volatile int cellsBusy;

      private static final sun.misc.Unsafe UNSAFE;
      private static final long BASE;
      private static final long CELLSBUSY;
      private static final long PROBE;

    static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> sk = Striped64.class;
                BASE = UNSAFE.objectFieldOffset
                    (sk.getDeclaredField("base"));
                CELLSBUSY = UNSAFE.objectFieldOffset
                    (sk.getDeclaredField("cellsBusy"));
                Class<?> tk = Thread.class;
                PROBE = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomProbe"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

      threadLocalRandomProbe是存在于Thread类里的,可以当成是随机数的种子发生器。

    @sun.misc.Contended("tlr")
        int threadLocalRandomProbe;

      还有一个重要的类 Cell ,就是一个value,外加上偏移量和cas方法

    @sun.misc.Contended static final class Cell {
            volatile long value;
            Cell(long x) { value = x; }
            final boolean cas(long cmp, long val) {
                return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
            }
    
            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            private static final long valueOffset;
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> ak = Cell.class;
                    valueOffset = UNSAFE.objectFieldOffset
                        (ak.getDeclaredField("value"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

      还有一个重要的成员变量要介绍  cellsBusy,代码中的注解写的很清楚,当扩容或者初始化Cells的时候,或者初始化某一个cell数组的一个成员的时候,

      这三种情况下cellsBusy = 1,其他情况下为0

    /**
         * Spinlock (locked via CAS) used when resizing and/or creating Cells.
         */
        transient volatile int cellsBusy;

      完事具备,下面分析源码

    LongAdder核心方法分析

      LongAdder的核心方法的入口就是add  

    public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    longAccumulate(x, null, uncontended);
            }
        }

      (as = cells) != null 如果cells已经不为空了,就是说经过了初始化那么就进入if分支

      !casBase(b = base, b + x)  CAS失败了说明有竞争也是要进到if分支。

      接下来看内部的if逻辑

      as == null || (m = as.length - 1) < 0 如果cells没初始化过,或者初始化了但是没有一个元素,则为true

      a = as[getProbe() & m]) == null    如果该线程对应的槽位是null,则为true

      uncontended = a.cas(v = a.value, v + x)  如果对应的槽位不为null,则尝试用CAS设置,如果CAS成功就不用进入if分支了

      我们总结一下进入if分支的条件,经过最外层的if,如果if没拦截住,说明此时cells还没初始化,而且还发生了竞争

      1 发生了竞争,但是cells还是null,或者是空的

         2 cells不是null,但是该线程对应的槽位是null的

      3 如果该线程对应的槽位也不是null,那么就对这个cell进行cas,如果失败了就进入if分支

      从上面的顺序看,LongAdder是不希望cells为null的,因为这会利用不了多cell带来的热点数据分散的好处。所以最优先的判断cells是否为null

     核心方法  longAccumulate

      先看看 longAccumulate 的脉络,其实就是分成三种情况。甚至来说,第三种情况只会出现一次,很简单,可以忽略了。所以我们重点分析前两种情况。

    final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) { // add方法wasUncontended是false表示有竞争
            int h;
            if ((h = getProbe()) == 0) {
                ThreadLocalRandom.current(); // force initialization
                h = getProbe();//重新生成线程的hash值
                wasUncontended = true;
            }
            boolean collide = false;                // True if last slot nonempty
            for (;;) {                  // 注意这是自旋,直到成功为止
                Cell[] as; Cell a; int n; long v;
                //CASE1 cells已经初始化了
                if ((as = cells) != null && (n = as.length) > 0) {
                    
                }
                //CASE2 初始化没有加锁,且cells是null
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    
                }
                //CASE3 如果走不进前两个分支说明cells正在初始化,该线程也别闲着,尝试去cas base值
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }

      先分析CASE2,因为一开始cells肯定是null的,所以先分析初始化流程。

              boolean init = false;
                    try {                           // Initialize table
                        if (cells == as) {
                            Cell[] rs = new Cell[2];//初始容量2
                            rs[h & 1] = new Cell(x);//相当于是%2
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0; 释放锁
                    }
                    if (init)
                        break;

      CASE3 就不分析了

     分析CASE1 

                if ((as = cells) != null && (n = as.length) > 0) { 
                    if ((a = as[(n - 1) & h]) == null) { //如果当前线程得到的槽位是空的,说明这是第一次进来,就需要对其初始化
                        if (cellsBusy == 0) {       // Try to attach new Cell 没人竞争
                            Cell r = new Cell(x);   // Optimistically create
                            if (cellsBusy == 0 && casCellsBusy()) {//再次判断 没人扩容或者初始化cell,并加锁
                                boolean created = false;
                                try {               // Recheck under lock
                                    Cell[] rs; int m, j;
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {//再次判断这个槽位是不是null
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    cellsBusy = 0;                  //释放锁
                                }
                                if (created)
                                    break;       // 第一处退出点
                                continue;           // Slot is now non-empty 再次自旋
                            }
                        }
                        collide = false;
                    }
                    else if (!wasUncontended)       // CAS already known to fail 传进来的就是false
                        wasUncontended = true;      // Continue after rehash 
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x)))) //如果当前的槽位不是null,直接cas如果成功直接break。注意这里是该分支第二处退出点,只有这么两个退出点
                        break;
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale 我觉得这句没啥意义,因为collide根本没有被用到,这句的意义就是拦截到cell的size比cpu核数大了就不会扩容了,其实就是拦截作用
                    else if (!collide)
                        collide = true;
                    else if (cellsBusy == 0 && casCellsBusy()) {//能够拿到扩容锁,我们分析下什么情况下会走到这个分支
                                                                1 线程拿到的槽位不是null
                                                                2 对某个槽位做cas还失败了
                                                                3 当前的cells的size还比cpu数少
                                                                
                        try {
                            if (cells == as) {      // Expand table unless stale
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    h = advanceProbe(h); //重算hash 该CASE1D的最外层一个for循环,对于每个分支如果不能return出去就一定会重新计算hash值
                }

      再看一下sum方法,注意看英文注解 The returned value is <em>NOT</em> an atomic snapshot

    /**
         * Returns the current sum.  The returned value is <em>NOT</em> an
         * atomic snapshot; invocation in the absence of concurrent
         * updates returns an accurate result, but concurrent updates that
         * occur while the sum is being calculated might not be
         * incorporated.
         *
         * @return the sum
         */
        public long sum() {
            Cell[] as = cells; Cell a;
            long sum = base;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }

      该方法就是对base和每个cell的直接相加,并没有加同步等措施。所以该值并不是一个十分准确的值。

      

  • 相关阅读:
    文件较验及Google Gson的使用
    SQLite数据库操作
    错误收集
    【记录】Linux安装JDK详细步骤
    【原创】RPM安装软件时解决依赖性问题(自动解决依赖型)
    【原创】rman备份出现ORA-19625
    【原创】rman 全库备份脚本
    【原创】TimeSten安装与配置
    【原创】查询占CPU高的oracle进程
    【参考】查找Oracle最高的几个等待事件以及锁的信息
  • 原文地址:https://www.cnblogs.com/juniorMa/p/13887876.html
Copyright © 2011-2022 走看看