之前一直没有重视LongAdder这个类,直到开始看ConcurrentHashMap源码后才发现在ConcurrentHashMap中计算map的size使用的正是LongAdder。
只不过ConcurrentHashMap并没有直接引用LongAdder,而是把LongAdder中的代码都拷贝了过去,至于为啥这么搞就不去研究了。
LongAdder存储结构和原理
LongAdder的内部有一个base
变量,一个Cell[]
数组。 base
变量:非竞态条件下,直接累加到该变量上 Cell[]
数组:竞态条件下,线程通过哈希运算找到它自己的槽位,累加个各个线程自己的槽Cell[i]
中
@sun.misc.Contended static final class Cell { volatile long value;
Cell只有一个value,多个线程通过cas对value进行累加。
LongAdder的原理可以简单理解如下,如果竞争不激烈就往base里写,如果激烈则每个线程往Cell数组的某一个元素写。Cell的最大容量不超过cpu数量。这么设计的好处不难理解,高并发下
如果多个线程都对一个共享变量写入,每次都只能有一个线程CAS成功,将写入动作分散到cpu核数那个Cell后,竞争压力大大减小。
LongAdder只有一个空构造器,所有复杂的逻辑都在它的父类Striped64中。包括base和cell都在
abstract class Striped64 extends Number {
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
threadLocalRandomProbe是存在于Thread类里的,可以当成是随机数的种子发生器。
@sun.misc.Contended("tlr") int threadLocalRandomProbe;
还有一个重要的类 Cell ,就是一个value,外加上偏移量和cas方法
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
还有一个重要的成员变量要介绍 cellsBusy,代码中的注解写的很清楚,当扩容或者初始化Cells的时候,或者初始化某一个cell数组的一个成员的时候,
这三种情况下cellsBusy = 1,其他情况下为0
/** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ transient volatile int cellsBusy;
完事具备,下面分析源码
LongAdder核心方法分析
LongAdder的核心方法的入口就是add
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
(as = cells) != null 如果cells已经不为空了,就是说经过了初始化那么就进入if分支
!casBase(b = base, b + x) CAS失败了说明有竞争也是要进到if分支。
接下来看内部的if逻辑
as == null || (m = as.length - 1) < 0 如果cells没初始化过,或者初始化了但是没有一个元素,则为true
a = as[getProbe() & m]) == null 如果该线程对应的槽位是null,则为true
uncontended = a.cas(v = a.value, v + x) 如果对应的槽位不为null,则尝试用CAS设置,如果CAS成功就不用进入if分支了
我们总结一下进入if分支的条件,经过最外层的if,如果if没拦截住,说明此时cells还没初始化,而且还发生了竞争
1 发生了竞争,但是cells还是null,或者是空的
2 cells不是null,但是该线程对应的槽位是null的
3 如果该线程对应的槽位也不是null,那么就对这个cell进行cas,如果失败了就进入if分支
从上面的顺序看,LongAdder是不希望cells为null的,因为这会利用不了多cell带来的热点数据分散的好处。所以最优先的判断cells是否为null
核心方法 longAccumulate
先看看 longAccumulate 的脉络,其实就是分成三种情况。甚至来说,第三种情况只会出现一次,很简单,可以忽略了。所以我们重点分析前两种情况。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // add方法wasUncontended是false表示有竞争 int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe();//重新生成线程的hash值 wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { // 注意这是自旋,直到成功为止 Cell[] as; Cell a; int n; long v; //CASE1 cells已经初始化了 if ((as = cells) != null && (n = as.length) > 0) { } //CASE2 初始化没有加锁,且cells是null else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } //CASE3 如果走不进前两个分支说明cells正在初始化,该线程也别闲着,尝试去cas base值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
先分析CASE2,因为一开始cells肯定是null的,所以先分析初始化流程。
boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2];//初始容量2 rs[h & 1] = new Cell(x);//相当于是%2 cells = rs; init = true; } } finally { cellsBusy = 0; 释放锁 } if (init) break;
CASE3 就不分析了
分析CASE1
if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { //如果当前线程得到的槽位是空的,说明这是第一次进来,就需要对其初始化 if (cellsBusy == 0) { // Try to attach new Cell 没人竞争 Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) {//再次判断 没人扩容或者初始化cell,并加锁 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {//再次判断这个槽位是不是null rs[j] = r; created = true; } } finally { cellsBusy = 0; //释放锁 } if (created) break; // 第一处退出点 continue; // Slot is now non-empty 再次自旋 } } collide = false; } else if (!wasUncontended) // CAS already known to fail 传进来的就是false wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) //如果当前的槽位不是null,直接cas如果成功直接break。注意这里是该分支第二处退出点,只有这么两个退出点 break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale 我觉得这句没啥意义,因为collide根本没有被用到,这句的意义就是拦截到cell的size比cpu核数大了就不会扩容了,其实就是拦截作用 else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) {//能够拿到扩容锁,我们分析下什么情况下会走到这个分支 1 线程拿到的槽位不是null 2 对某个槽位做cas还失败了 3 当前的cells的size还比cpu数少 try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); //重算hash 该CASE1D的最外层一个for循环,对于每个分支如果不能return出去就一定会重新计算hash值 }
再看一下sum方法,注意看英文注解 The returned value is <em>NOT</em> an atomic snapshot
/** * Returns the current sum. The returned value is <em>NOT</em> an * atomic snapshot; invocation in the absence of concurrent * updates returns an accurate result, but concurrent updates that * occur while the sum is being calculated might not be * incorporated. * * @return the sum */ public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
该方法就是对base和每个cell的直接相加,并没有加同步等措施。所以该值并不是一个十分准确的值。