zoukankan      html  css  js  c++  java
  • JDK 8 新增的 LongAdder,得过来看一下

    前言


    在介绍 AtomicInteger 时,已经说明在高并发下大量线程去竞争更新同一个原子变量时,因为只有一个线程能够更新成功,其他的线程在竞争失败后,只能一直循环,不断的进行 CAS 尝试,从而浪费了 CPU 资源。而在 JDK 8 中新增了 LongAdder 用来解决高并发下变量的原子操作。下面同样通过阅读源码来了解 LongAdder 。


    公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!

    介绍

    一个或多个变量共同维持初值为 0 总和。 当跨线程竞争更新时,变量集可以动态增长以减少竞争。 方法 sum 返回当前变量集的总和。

    当多个线程更新时,这个类是通常优选 AtomicLong ,比如用于收集统计信息,不用于细粒度同步控制的共同总和。 在低更新竞争,这两个类具有相似的特征。 但在高更新竞争时,使用 LongAdder 性能要高于 AtomicLong,同样要消耗更高的空间为代价。

    longadder-uml

    LongAdder 继承了 Striped64,内部维护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 里面都有一个初始值为 0 的 long 型变量。

    源码分析

    Cell 类

    Cell 类 是 Striped64 的静态内部类。

    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    1. Cell 使用 @sun.misc.Contended 注解。
    2. 内部维护一个被 volatile 修饰的 long 型 value 。
    3. 提供 cas 方法,更新value。

    其中 @sun.misc.Contended 注解作用是为了减少缓存争用。什么是缓存争用,这里只做下简要介绍。

    伪共享
    CPU 存在多级缓存,其中最小存储单元是 Cache Line,每个 Cache Line 能存储 64 个字节的数据。
    在多线程场景下,A B 两个线程数据如果被存储到同一个 Cache Line 上,此时 A B 更新各自的数据,就会发生缓存争用,导致多个线程之间相互牵制,变成了串行程序,降低了并发。
    @sun.misc.Contended 注解,则可以保证该变量独占一个 Cache Line。
    详细可参考:http://openjdk.java.net/jeps/142

    Striped64 核心属性

    abstract class Striped64 extends Number {
    
        /** CPU 的数量,以限制表大小 */
        static final int NCPU = Runtime.getRuntime().availableProcessors();
    
        /**
         * cell 数组,当非空时,大小是 2 的幂。
         */
        transient volatile Cell[] cells;
    
        /**
         * Base 值,在无争用时使用,表初始化竞赛期间的后备。使用 CAS 更新 
         */
        transient volatile long base;
    
        /**
         * 调整大小和创建Cells时自旋锁(通过CAS锁定)使用。
         */
        transient volatile int cellsBusy;
    }
    

    Striped64 类主要提供以下几个属性:

    1. NCPU:CPU 的数量,以限制表大小。
    2. cells:Cell[] cell 数组,当非空时,大小是 2 的幂。
    3. base:long 型,Base 值,在无争用时使用,表初始化竞赛期间的后备。使用 CAS 更新。
    4. cellsBusy:调整大小和创建Cells时自旋锁(通过CAS锁定)使用。

    下面看是进入核心逻辑:

    uFCfpw

    LongAdder#add

    public class LongAdder extends Striped64 implements Serializable {
        
        public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            // cells 是 数组,base 是基础值
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    longAccumulate(x, null, uncontended);
            }
        }
    }
    
    
    abstract class Striped64 extends Number {
        // 使用 CAS 更新 BASE 的值
        final boolean casBase(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
        }
        // 返回当前线程的探测值。 由于包装限制,从ThreadLocalRandom复制
        static final int getProbe() {
            return UNSAFE.getInt(Thread.currentThread(), PROBE);
        }
    }
    
    

    longadder-add

    1. 首先会对 Base 值进行 CAS 更新,当 Base 发生竞争时, 会更新数组内的 Cell 。
    2. 数组未初始化,Cell 未初始化, Cell 更新失败,即 Cell 也发生竞争时,会调用 Striped64 的 longAccumulate 方法。

    Striped64#longAccumulate

    
    abstract class Striped64 extends Number {
        /**
         * x 要增加的值
         * wasUncontended 有没有发生竞争 
         */
        final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
            int h;
            // 当前线程有无初始化线程探测值, 给当前线程生成一个 非 0 探测值
            if ((h = getProbe()) == 0) {
                ThreadLocalRandom.current(); // force initialization
                h = getProbe();
                wasUncontended = true;
            }
            boolean collide = false;                // True if last slot nonempty
            // 循环
            for (;;) {
                Cell[] as; Cell a; int n; long v;
                // 数组不为空切数组长度大于 0
                if ((as = cells) != null && (n = as.length) > 0) {
                    // (n - 1) & h 获取到索引,索引处 cell 是否为 null, cell未初始化
                    if ((a = as[(n - 1) & h]) == null) {
                        // 判断 cellsBusy 是否为 0
                        if (cellsBusy == 0) {       // Try to attach new Cell
                            Cell r = new Cell(x);   // Optimistically create
                            // cellsBusy == 0 且 使用 casCellsBusy 方法将其更新为 1,失败会继续循环
                            if (cellsBusy == 0 && casCellsBusy()) {
                                boolean created = false;
                                try {               // Recheck under lock
                                    Cell[] rs; int m, j;
                                    // 重新检查状态 并创建
                                    if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    // 创建完成之后, 改回 cellsBusy 值
                                    cellsBusy = 0;
                                }
                                if (created)
                                    break;
                                // 未创建继续循环
                                continue;           // Slot is now non-empty
                            }
                        }
                        collide = false;
                    }
                    // 传入的 wasUncontended 为 false 即发生碰撞了, 修改为未碰撞, 此处会继续循环,走到下一步,相当于会一直循环这个 cell
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
                    // cas 更新 cell 的 value, 成功则返回
                    else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                        break;
                    // 数组到最大长度 即大于等于 CPU 数量, 或者 cells 数组被改变,
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale
                    else if (!collide)
                        collide = true;
                    // 乐观锁 进行扩容
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            if (cells == as) {      // Expand table unless stale
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    // 当前探针值不能操作成功,则重新设置一个进行尝试
                    h = advanceProbe(h);
                }
                // 没有加 cellsBusy 乐观锁 且 没有初始化,且获得锁成功(此时 cellsBusy == 1)
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {                           // Initialize table
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
                // 尝试在base上累加
                else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }
    }
    

    longAccumulate 方法一共有三种情况

    1. (as = cells) != null && (n = as.length) > 0 数组不为空且长度大于 0 。
      1. 获取索引处的 cell , cell 为空则进行初始化。
      2. cell 不为空,使用 cas 更新, 成功 break; 跳出循环, 失败则还在循环内,会一直尝试。
      3. collide 指是否发生冲突,冲突后会进行重试。
      4. 冲突后会尝试获得锁并进行扩容,扩容长度为原来的 2 倍,然后继续重试。
      5. 获得锁失败(说明其他线程在扩容)会重新进行计算探针值。
    2. cellsBusy == 0 && cells == as && casCellsBusy() 数组为空,获得乐观锁成功。
      1. 直接初始化数组。
      2. 初始数组长度为 2 。
    3. casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))) 获得乐观锁失败。
      1. 说明有其他线程在初始化数组,直接 CAS 更新 base 。

    LongAdder#sum

    
    public class LongAdder extends Striped64 implements Serializable {
    
        public long sum() {
            Cell[] as = cells; Cell a;
            long sum = base;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    }
    
    1. 数组为空,说明没有发生竞争,直接返回 base 。
    2. 数组不为空,说明发生竞争,累加 cell 的 value 和 base 的和进行返回。

    总结

    基本流程

    1. LongAdder 继承了 Striped64,内部维护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 里面都有一个初始值为 0 的 long 型变量。
    2. 未发生竞争时(Cells 数组未初始化),是对 base 变量进行原子操作。
    3. 发生竞争时,每个线程对自己的 Cell 变量的 value 进行原子操作。

    如何确定哪个线程操作哪个 cell?

    通过 getProbe() 方法获取该线程的探测值,然后和数组长度 n - 1& 操作 (n - 1) & h 。

    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    

    Cells 数组初始化及扩容?

    初始化扩容时会判断 cellsBusy, cellsBusy 使用 volatile 修饰,保证线程见可见性,同时使用 CAS 进行更新。 0 表示空闲,1 表示正在初始化或扩容。

    初始化时会创建长度为 2 的 Cell 数组。扩容是创建一个长度是原数组长度 2 倍的新数组,并循环赋值。

    如果线程访问分配的 Cell 元素有冲突后,会使用 advanceProbe() 方法重新获取探测值,再次进行尝试。

    使用场景

    在高并发情况下,需要相对高的性能,同时数据准确性要求不高,可以考虑使用 LongAdder。

    当要保证线程安全,并允许一定的性能损耗时,并对数据准确性要求较高,优先使用 AtomicLong。

  • 相关阅读:
    斜率dp cdq 分治
    POJ2449 (k短路)
    BZOJ1576 (最短路+并查集)
    SWUST0249 (凸包面积)
    道路修建 (网络流)
    HDU3930 (原根)
    ZOJ2006 (后缀自动机)
    Codechef2015 May
    后缀自动机
    Digit (数位DP)
  • 原文地址:https://www.cnblogs.com/liuzhihang/p/longaddr.html
Copyright © 2011-2022 走看看