ConcurrentHashMap可以做到比較高性能的并发訪问。原因是锁分段,及get不用加锁,就算堵塞时用的是ReentrantLock。
Segment相当于是一个子map,拥有一个HashEntity数组。
这样能够将并发压力分摊到多个Segment上。
ConcurrentHashMap组成图:
这里的Segment个数必须为2的n次方,为了之后高效计算要存放的key存放在哪个Segment上(用 << 实现)。
Segment内部拥有一个类型为volatile的HashEntry数组,这是为了可以让其他线程看到最新的值。
ConcurrentHashMap拥有put,get,remove等操作。而原有的扩容操作。为了性能则仅仅针对单个Segment来进行。
关键的Segment相当于ConcurrentHashMap中的子map。它包括了一个HashEntry数组。它的类图例如以下:
这里关键的Segment类图例如以下:
HashEntry内部存储了hash值。final的key值,及volatile的value及next值。
这里value及next值是volatile是为实现get操作的不须要加锁提供了基础。
HashEntry类图例如以下:
以下分别从ConcurrentHashMap创建,往ConcurrentHashMap放入元素。从ConcurrentHashMap取出元素来进行分析。
一.ConcurrentHashMap创建
假设不传入,使用空构造函数时,默认的map容量大小为16,loadFactor为0.75,并发数为16。
我们以默认的參数開始分析。
首先须要确定总容量大小(这里指全部Segment中存放数组元素的数组大小总和),Segment总数,以及每一个Segment中HashEntry数组的大小。
我们先附上代码:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
能够看到,对于Segment个数ssize取的是以concurrencyLevel为上界的2的n次方,这里因为concurrencyLevel为16。所以ssize也为16。
然后计算segmentShift,这个值用于兴许在存放元素到ConcurrentHashMap时。用于决定有多少位hash高位參与计算存放元素的Segment数组下标。
这里segmentShift为32-sshift。sshift取的是Segment个数ssize的2的n次数的n值。即在计算ssize时左移的次数。这里为4。
所以segmentShift为28。
接着计算出segmentMask,这个值用于在计算得到存放元素的Segment所在数组的下标,这里为了效率使用 & 操作来替代%模操作。
这里因为Segment个数为16,所以segmentMask为15。
之后会计算出每一个Segment的数组容量。这里先计算出每一个Segment的大致大小,即用int c = initialCapacity / ssize; 来得出c,这里c为1。
之后会初始化一个cap作为真正的segment中HashEntry数组大小,将它初始化为2。
然后为了保证得出的segMent中HashEntry的大小为2的n次方,所以兴许会对cap做下面操作:
while (cap < c)
cap <<= 1;
即cap的值要不是2(c值小于等于2时),或者是大于2的2的n次方。
得到终于每一个Segment中HashEntry数组大小后,创建一个Segment数组。并在Segment数组0处初始化创建一个Segment。
最后将ConcurrentHashMap的segments设置为新创建的Segments数组。
这里能够看到最后将创建的Segment s0是调用UNSAFE.putOrderedObject(ss,SBASE,s0)来将其放入到ss数组中。
这里的UNSAFE.putOrderedObject是JAVA提供的用于直接操作内存的方法。当中參数ss是数组。
UNSAFE.putOrderedObject会延迟更新到内存中,可是因为兴许在获取segment数组中的segment时,採用的是UNSAFE.getObjectVolatile,所以可以保证对于segment放入数组的操作对于兴许的线程是可见的。
SBASE是ConcurrentHashMap的一个静态final long的值,相当于是Segment数组的首地址。
s0则是创建的须要放入ss数组中的segment实例。这里是将s0放入到ss数组位置0中。
假设提供类似:UNSAFE.putOrderedObject(ss,SBASE+offset,s)表明将元素s放入到数组ss的SBASE+offset位置。
当中这里的offset:
offset=步长*要放在数组第几位
步长通常是在编译期已经确定,这里測试过对于:Double或者Integer等。步长都是4。