zoukankan      html  css  js  c++  java
  • 瞄一眼LongAdder(jdk11)

    java版本11.0.1,感觉写得太水了,等心情好的时候再重新编辑一下。

    LongAdder中的核心逻辑主要由java.util.concurrent.atomic.Striped64维护,作为Striped64的继承类LongAdder定义了(LongAccumulator、DoubleAdder、DoubleAccumulator...)一些外围逻辑

        /**
         * Cell(单元)表,不为null时大小为2的幂
         */
        transient volatile Cell[] cells;
    
        /**
         * 基值,主要用在没有竞争访问时使用, 也会用在竞争创建cells失败时的备选方案。CAS更新
         */
        transient volatile long base;
    
        /**
         * cells的自旋锁,在创建/扩容cells时会用到
         */
        transient volatile int cellsBusy;

    sum()遍历cells累加和base,reset()遍历cells和base赋值0,sumThenReset()遍历cells和base用CAS操作累加并赋值0。比较简单就这么概括一下。

    主要方法LongAdder#add

      public void add(long x) {
        Cell[] cs;  long b, v;  int m;  Cell c;
        /**
         * 当一开始没有竞争调用时,CAS操作base值累加.当开始出现竞争时开始走下面的逻辑,不再累加base
         */
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
          /**
           * 设置“无竞争”标识
           */
          boolean uncontended = true;
          /**
           * 当cells未初始化时继续以下判断逻辑,否则调用longAccumulate()
           */
          if (cs == null || (m = cs.length - 1) < 0 ||
              /**
               * 根据访问线程的特征值(probeValue)获取cells中访问线程对应的cell. cell未初始化时调用longAccumulate()
               * 线程特征值和cells.length的&操作确保cs[getProbe()&m]不会越界
               */
              (c = cs[getProbe() & m]) == null ||
              /**
               * 对访问线程对应的cell的值CAS操作,并把执行结果赋值uncontended.若CAS操作失败则调用longAccumulate()
               */
              !(uncontended = c.cas(v = c.value, v + x))) {
    
            longAccumulate(x, null, uncontended);
          }
        }
      }

    核心逻辑藏在了Striped64#longAccumulate,稍微花时间瞄了两眼。

      /**
       * 处理涉及到初始化、扩容、创建及碰撞更新cell的情况.
       *
       * @param x              运算值
       * @param fn             运算操作,null时为加法
       * @param wasUncontended 调用该方法前CAS操作的结果,CAS操作失败则为false
       */
      final void longAccumulate(long x, LongBinaryOperator fn,
                                boolean wasUncontended) {
        /**
         * 线程特征值
         */
        int h;
        /**
         * 当调用线程是第一次调用longAccumulate()时,赋值线程的特征值
         */
        if ((h = getProbe()) == 0) {
          ThreadLocalRandom.current();
          h = getProbe();
          wasUncontended = true;
        }
        /**
         * 表示对cell的获取是否与其他线程碰撞, 用来判断cells是否需要扩容
         */
        boolean collide = false;
        /**
         * 未获取到cellsBusy时则自旋
         */
        done: for (;;) {
          Striped64.Cell[] cs; Striped64.Cell c; int n; long v;
          /**
           * 当cells已存在并且不为空时
           */
          if ((cs = cells) != null && (n = cs.length) > 0) {
            /**
             * 当访问线程对应的cell尚未存在时,新增Cell(x)
             */
            if ((c = cs[(n - 1) & h]) == null) {
              /**
               * 尝试获取自旋锁
               */
              if (cellsBusy == 0) {
                /**
                 * (乐观)创建Cell
                 */
                Striped64.Cell r = new Striped64.Cell(x);
                /**
                 * 尝试获取自旋锁
                 */
                if (cellsBusy == 0 && casCellsBusy()) {
                  try {
                    Striped64.Cell[] rs; int m, j;
                    /**
                     * 在持有cellsBusy锁的情况下再次检查访问线程对应的cell是否已存在
                     */
                    if ((rs = cells) != null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                      /**
                       * 新增Cell并且跳出自旋
                       */
                      rs[j] = r;
                      break done;
                    }
                  } finally {
                    cellsBusy = 0;
                  }
                  /**
                   * 新增Cell失败,自旋
                   */
                  continue;
                }
              }
              /**
               * 如果尝试获取自旋锁失败,说明已有其他线程占用了该Cell,
               * 之后为减少碰撞会调用advanceProbe()
               */
              collide = false;
            }
            /**
             * 如果先前的cs[getProbe()&m]的CAS累加操作失败, 则wasUncontended赋值true
             */
            else if (!wasUncontended)
              wasUncontended = true;
            /**
             * 对cell执行CAS操作,成功则方法结束,失败则自旋
             */
            else if (c.cas(v = c.value,
                (fn == null) ? v + x : fn.applyAsLong(v, x)))
              break;
            /**
             * 当cells大小大于逻辑cpu数,不扩容
             */
            else if (n >= NCPU || cells != cs)
              collide = false;
            /**
             * 通过collide是否碰撞判断是否执行下面的扩容逻辑
             * collide==false时则自旋
             */
            else if (!collide)
              collide = true;
            /**
             * 执行到这里说明需要扩容
             * 尝试获取cellsBusy锁扩容
             */
            else if (cellsBusy == 0 && casCellsBusy()) {
              try {
                if (cells == cs)        // Expand table unless stale
                  cells = Arrays.copyOf(cs, n << 1);
              } finally {
                cellsBusy = 0;
              }
              collide = false;
              continue;
            }
            /**
             * 这是为了重新计算访问线程的特征值(advanceProbe(h))后自旋,减少碰撞
             */
            h = advanceProbe(h);
          }
          /**
           * 当cells没有正在初始化/扩容(cellsBusy == 0)并且cells未被创建(cells == cs)时,则设置cells的自旋锁cellBusy,开始创建cells对象
           */
          else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
            try {
              /**
               * 初始化cells和访问线程对应的Cell对象
               */
              if (cells == cs) {
                Striped64.Cell[] rs = new Striped64.Cell[2];
                rs[h & 1] = new Striped64.Cell(x);
                cells = rs;
                break done;
              }
            } finally {
              /**
               * cells创建完成后释放cellBusy锁
               */
              cellsBusy = 0;
            }
          }
          /**
           * 以上判断条件失败,则走备选逻辑:CAS操作运算base值(计算sum时会加上base值)
           */
          else if (casBase(v = base,
              (fn == null) ? v + x : fn.applyAsLong(v, x)))
            break done;
        }
      }

    懒得画流程图了~

  • 相关阅读:
    Hadoop启动报Error: JAVA_HOME is not set and could not be found
    mrjob在hadoop上跑的时候,报错
    Hadoop3安装踩坑 there is no HDFS_NAMENODE_USER defined. Aborting operation.
    mrjob 运行报错
    站位

    Lua基本数据类型
    常量指针和指针常量
    C基础题
    C++拷贝构造函数(深拷贝,浅拷贝)
  • 原文地址:https://www.cnblogs.com/niceboat/p/9940112.html
Copyright © 2011-2022 走看看