ConcurrentHashMap
首先还是成员变量的认识 :
与hashMap一致的.
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
private static final float LOAD_FACTOR =0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
比hashMap多的
private static final int MIN_TRANSFER_STRIDE = 16; 一个线程最少处理16个位桶的处理.
static final int MOVED = -1; // hash for forwarding nodes // 头结点的hash值为-1, 表示当前table正在初始化
static final int TREEBIN = -2; // hash for roots of trees // 头结点的hash值为-2 , 表示当前位桶已经变为树结构
static final int RESERVED = -3; // hash for transient reservations // 位桶的头结点为-3 , 表示当前位桶正在转移
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
private transient volatile int sizeCtl; // 很关键的一个变量,伴随ConcurrentHashMap整个流程.
static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的数量
静态内部类Node
a. 与HashMap内不同的是, 这里 value 与 next 属性 都是 volatile 类型的. (多线程之前可见的)
a . 这里table 和 nextTable也是 transient volatile 类型的 Node[] .
1. 同样, ConcurrentHash也是查看它的put方法.
a . 其中 onlyIfAbsent ,当为false时, 遇到相同的key值,value会进行覆盖.
a . 还是一行行开始查看代码吧.
b. 与HashMap不同的是, ConcurrentHashMap的 key 与 Value 都不允许为Null . 否则直接抛出 throw new NullPointerException()
c . 求位桶的方式与hashMap相差不对. 也是高16位与低16位进行异或.
d. for (Node<K,V>[] tab = table;;) { 这样的一个死循环. 因为就条件而言,不存在一个跳出条件. 将成员变量table赋值给 tab .
e . if (tab == null || (n = tab.length) == 0) , 需要进行扩容. 开始进入 tab = initTable();
f . 开始查看 initTable() 方法 . 需要注意的是ConcurrentHashMap 是考虑并发的.
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 判断出这个while循环的条件是 , tab不为空.
while ((tab = table) == null || tab.length == 0) {
// sc < 0 , 也就是等于-1 的时候, 代表table初始化的这件事情已经有线程在做了,
if ((sc = sizeCtl) < 0)
//让出CPU时间片
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 采用的cas 乐观锁. int sc 默认值为 0 , 将其赋值为 -1
try {
// 再次判断, 当前tab数组的是否还是为null
if ((tab = table) == null || tab.length == 0) {
// 此时 sc应该是 -1 , 所以 n = DEFAULT_CAPACITY (默认数组长度16)
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 这里开始创建一个16位的Node数组.
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 两层赋值 , 这时候 table就是一个16位的数组.
table = tab = nt;
// 再来对sc赋值. 16 - (16 % 4) = 12 . 其实这时候sc就是扩容的临界值的
sc = n - (n >>> 2);
}
} finally {
// 把sc赋值给 sizeCtl , 这时候 sizeCtl就有了它的第二层含义,当前数组扩容的临界值
sizeCtl = sc;
}
// 初始化结束,跳出for循环的出口
break;
}
}
// 将当前初始化的Node数组返回
return tab;
}
a 大概初始化的流程就是如上代码注释, 是采用一个全局变量 , 初始值因为是 int 类型, 默认值为0 , 再一个线程抢到初始化成员变量table的任务后,会采用CAS 乐观锁,将其变为-1 . 控制其他线程就算进入也是让出CPU时间片. 而抢到初始化Node线程会在 new Node[]后,将sizeCtl 修改为 当前数组扩容的临界值.
b . 初始化结束后, 将 new 的tab返回.因为这是一个死循环, 所以这个时候并退出不了这个for循环,下面的条件不满足,直接开始第二次循环.
c . 这次循环进来之后. 进入else if 内了. i 又是同样的算法,将其赋值为 (n - 1) & h , tabAt() 也是通过乐观锁来获取当前内容上的Node(链表的头结点) , if头结点为Null. 说明该位桶上不存在元素, 再次采用cas, 在当前位桶上的元素 由 null 更新为 new Node(); 此时的 break就可以跳出for的死循环了.如果失败,自旋重新替换.直到替换成功.
d . 跳出for循环for继续往下看.
a . 因为是初始化并且第一次添加元素, 跳出for循环后,直接就可以 addCount()方法了. delta =1.
b. 扩容,设计者也考虑到了多线程, 采用了数组来分流.
用到了 CounterCell 内部类.内部只有一个Volatile 的 Value .
开始查看 addCount代码
// 对ConcurrentHashMap进行计数的操作, ConcurrentHashMap.size()方法就是通过这里计数实现的
// x 默认是 1 (每一次put都是放入一个元素) , int check 表示链表长度(不包含当前元素的情况下)
private final void addCount(long x, int check) {
// CounterCell[] as 用来计数的一个数组,通过内部CounterCell内部的value属性相加, 来求最终size()值
CounterCell[] as; long b, s;
// 如果当前 成员变量counterCells != null , 说明之前已经记过数了.
// || 采用Cas方法, 将当前baseCount + x 赋值给 baseCount ,如果赋值失败,说明存在并发,这里不再自旋处理了,而是直接放弃Cas乐观锁处理.
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 这里是放弃CAS乐观锁处理,继续往下看
CounterCell a; long v; int m;
// 初始化一个变量,并将这个变量设置为true
boolean uncontended = true;
// 因为上面进入这个if的条件是 不为空 或者 CAS失败
// 所以这还是进行了一个非空判断
// 这个括号内的条件比较有意思. 一步步往下走,但是每一步的赋值结果又可以作为下一步的判断结果
// as == null 如果CounterCell[] == null . 还没有出现过分流用的数组
// (m = as.length - 1) < 0 ,其实设计者可以写为 (m = as.length) ==0 , 但是 m这个变量,开发者想作为数组的索引使用, 相当于等式左右都 -1 .
// (a = as[ThreadLocalRandom.getProbe() & m]) 这里 CounterCell a 也进行了赋值. ThreadLocalRandom.getProbe() & m , 熟悉的代码, 又是获取数组内的一个随机位桶, 如果这个位桶上的CounterCell == null .
// CounterCell a 这个变量已经在上一步赋值了, 能进行到这一步,说明上一步不为Null , 那么这就开始采用CAS 来对 CounterCell a 中的Value 属性进行递增 ,其实也是 +1 .
// ThreadLocalRandom.getProbe() 是一个基于线程得到的随机数, 性能比Random高
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
// 如果链表的个数 <=1 , 没比较考虑扩容了,直接return出去.这个元素计数结束.
if (check <= 1)
return;
// 否则这里S还需要进行一下赋值, 就是所有位桶上CounterCell的Value值 + baseCount的值, 就是目前ConcurrentHashMap内所有元素的个数.
s = sumCount();
}
// 接下来是扩容的事情. 先不看
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
开始查看 fullAddCount 代码
// 当 baseCount 采用 CAS 乐观锁失败, CounterCell的Value值采用乐观锁更新失败. 反正就是当前元素计数SumCount失败了, 就该进行当前方法来计数了
// X 是添加元素的个数 , 这个其实使用的时候,一般都是1 , boolean wasUncontended 是否存在冲突
private final void fullAddCount(long x, boolean wasUncontended) {
// 这里int类型h的定义, 已经接下来if中的赋值,都是想产生一个随机数.
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
// 这里又是一个死循环,自旋
for (;;) {
// 同样定义了一堆变量,
CounterCell[] as; CounterCell a; int n; long v;
// 如果CounterCells 这个成员变量已经被初始化了,不等于0
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果CounterCells[]数组的随机位桶上元素 ConterCell a == null . 表示虽然初始化了,但是可能随机到另外一个位桶上了 (当前考虑 数组的length =2 的情况)
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
//new一个CounterCell , value也是直接复制为 1
CounterCell r = new CounterCell(x); // Optimistic create
// 也是根据cellsBusy, 将其从0替换为1 , 如果替换成功
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 是否创建完成 默认为false.
boolean created = false;
try { // Recheck under lock
// 下面就是将这个 CounterCell 放在刚才判断为null的位桶上了. (m - 1) & h 其实与上面的 (n - 1) & h 一样.
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 再次吧cellsBusy设置为0
cellsBusy = 0;
}
//如果创建完成,跳出for循环
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
// 这里开始就是 当前成员变量CounterCells还没有被初始化
// 这里通过一个 int cellsBusy 成员变量, private transient volatile int cellsBusy; 如果还是0 ,代表还没有被初始化
// 如果想要开始初始化, 也是进行一下CAS 乐观锁. 将 cellsBusy 由 0 更新为 1
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// cellsBusy已经占位成功, 成功更新为1了.
// 创建一个 boolean 类型的 init变量 , 默认初始化为false.
boolean init = false;
try { // Initialize table
// 开始初始化table, 这里默认是2位数组
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
// rs[h & 1] = rs[h & rs.length-1] , 还是hashMap中获取一个随机位桶的写法, 并赋值为 new CounterCell(1) , 就是将value值直接置为1 .
rs[h & 1] = new CounterCell(x);
// 将初始化的 rs 赋值给成员变量 counterCells
counterCells = rs;
// 初始化的标志位设置为true . 初始化成功,这里并不会跳出自旋.
init = true;
}
} finally {
// 这里初始化成功,会重新把CellsBusy设置为 0
cellsBusy = 0;
}
// 如果初始化失败,跳出for循环
if (init)
break;
}
// 如果线程竞争激烈, 那么还是采用baseCount方法来cas增加元素个数.增加成功跳出for循环.
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}