  • JAVA-JDK1.7-ConCurrentHashMap-源码并且debug说明


     在一个程序员的成长过程就一定要阅读源码,并且了解其中的原理,只有这样才可以深入了解其中的功能,就像ConCurrentHashMap 是线程安全的,到底是如何安全的?以及如何正确使用它?rehash在什么情况?红黑树存储原理?不了解其中源码原理是不行的。所以今天就写一篇Java源码的,关于ConCurrentHashMap的源码,大部分来自于网上摘抄,有的需要自己测试验证,现作为随笔记录于此,便于记录整理 ,并且分享给大家共同进步。






    Segment数组的意义就是将一个大的table分割成多个小的table 来进行加锁,也就是上面提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。HashEntry用来封装映射表的键/值对;Segment充当锁的角色,每个Segment对象守护散列表的若干个的桶。每个桶是由若干个HashEntry对象连接起来的链表。



    HashEntry 类

    HashEntry 用来封装散列映射表中的键值对,可以理解为一个桶。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

     1 /**
     2      * ConcurrentHashMap list entry. Note that this is never exported
     3      * out as a user-visible Map.Entry.
     4      */
     5     static final class HashEntry<K,V> {
     6         final int hash;
     7         final K key;
     8         volatile V value;
     9         volatile HashEntry<K,V> next;
    11         HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
    12             this.hash = hash;
    13             this.key = key;
    14             this.value = value;
    15             this.next = next;
    16         }
    18         39     }

    在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“链地址法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:

    Segment 类

    Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。

    table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。 count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。

      5     static final class Segment<K,V> extends ReentrantLock implements Serializable {
     33         private static final long serialVersionUID = 2249069246763182397L;
     42         static final int MAX_SCAN_RETRIES =
     43             Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
     49         transient volatile HashEntry<K,V>[] table;
     55         transient int count;
     64         transient int modCount;
     71         transient int threshold;
     79         final float loadFactor;
     81         Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
     82             this.loadFactor = lf;
     83             this.threshold = threshold;
     84             this.table = tab;
     85         }
     87         final V put(K key, int hash, V value, boolean onlyIfAbsent) {省略}
    131         /**
    132          * Doubles size of table and repacks entries, also adding the
    133          * given node to new table
    134          */
    135         @SuppressWarnings("unchecked")
    136         private void rehash(HashEntry<K,V> node) {省略}
    207         private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {...}
    245         private void scanAndLock(Object key, int hash) {...}
    270         /**
    271          * Remove; match on key only if value null, else match both.
    272          */
    273         final V remove(Object key, int hash, Object value) {...}
    308         final boolean replace(K key, int hash, V oldValue, V newValue) {...}
    332         final V replace(K key, int hash, V value) {...}
    354         final void clear() {...}
    366     }

    下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:



     1 /**
     2      * Creates a new, empty map with the specified initial
     3      * capacity, load factor and concurrency level.
     4      *
     5      * @param initialCapacity the initial capacity. The implementation
     6      * performs internal sizing to accommodate this many elements.
     7      * @param loadFactor  the load factor threshold, used to control resizing.
     8      * Resizing may be performed when the average number of elements per
     9      * bin exceeds this threshold.
    10      * @param concurrencyLevel the estimated number of concurrently
    11      * updating threads. The implementation performs internal sizing
    12      * to try to accommodate this many threads.
    13      * @throws IllegalArgumentException if the initial capacity is
    14      * negative or the load factor or concurrencyLevel are
    15      * nonpositive.
    16      */
    17     @SuppressWarnings("unchecked")
    18     public ConcurrentHashMap(int initialCapacity,
    19                              float loadFactor, int concurrencyLevel) {
    20         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    21             throw new IllegalArgumentException();
    22         if (concurrencyLevel > MAX_SEGMENTS)
    23             concurrencyLevel = MAX_SEGMENTS;
    24         // Find power-of-two sizes best matching arguments
    25         int sshift = 0;
    26         int ssize = 1;
    27         while (ssize < concurrencyLevel) {
    28             ++sshift;
    29             ssize <<= 1;
    30         }
    31         this.segmentShift = 32 - sshift;
    32         this.segmentMask = ssize - 1;
    33         if (initialCapacity > MAXIMUM_CAPACITY)
    34             initialCapacity = MAXIMUM_CAPACITY;
    35         int c = initialCapacity / ssize;
    36         if (c * ssize < initialCapacity)
    37             ++c;
    38         int cap = MIN_SEGMENT_TABLE_CAPACITY;
    39         while (cap < c)
    40             cap <<= 1;
    41         // create segments and segments[0]
    42         Segment<K,V> s0 =
    43             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    44                              (HashEntry<K,V>[])new HashEntry[cap]);
    45         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    46         UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    47         this.segments = ss;
    48     }


     从上图可以知道当走到第31行代码时  31 this.segmentShift = 32 - sshift;  参数变量的值为图上所示,初始化大小 initialCapacity为16,负载 loadFactor为0.75(负载因子,扩容需要参考),并行度concurrencyLevel为16,sshit为4:2的sshit次方等于ssize,ssize为16:这是segment数组的长度,根据concurrencyLevel算出,代码可知ssize一定是大于或者等于concurrencyLevel最小的2次幂,为什么Segment大小一定是2次幂,那是因为通过位与散列算法来定位Segment的index,通过上面的结果计算出segmentShift和segmentMask,即segmentShit是28,为什么32减去4呢,因为hash()方法的输出最大位数就是32位数,segmentMask是散列运算的掩码,这里值是15,


    这两个全局变量主要作用就是定位segment,int j =(hash >>> segmentShift) & segmentMask。












     1 /**
     2      * Maps the specified key to the specified value in this table.
     3      * Neither the key nor the value can be null.
     4      *
     5      * <p> The value can be retrieved by calling the <tt>get</tt> method
     6      * with a key that is equal to the original key.
     7      *
     8      * @param key key with which the specified value is to be associated
     9      * @param value value to be associated with the specified key
    10      * @return the previous value associated with <tt>key</tt>, or
    11      *         <tt>null</tt> if there was no mapping for <tt>key</tt>
    12      * @throws NullPointerException if the specified key or value is null
    13      */
    14     @SuppressWarnings("unchecked")
    15     public V put(K key, V value) {
    16         Segment<K,V> s;
    17         if (value == null)
    18             throw new NullPointerException();
    19         int hash = hash(key);
    20         int j = (hash >>> segmentShift) & segmentMask;
    21         if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
    22              (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
    23             s = ensureSegment(j);
    24         return s.put(key, hash, value, false);
    25     }


     1 private int hash(Object k) {
     2         int h = hashSeed;
     4         if ((0 != h) && (k instanceof String)) {
     5             return sun.misc.Hashing.stringHash32((String) k);
     6         }
     8         h ^= k.hashCode();
    10         // Spread bits to regularize both segment and index locations,
    11         // using variant of single-word Wang/Jenkins hash.
    12         h += (h <<  15) ^ 0xffffcd7d;
    13         h ^= (h >>> 10);
    14         h += (h <<   3);
    15         h ^= (h >>>  6);
    16         h += (h <<   2) + (h << 14);
    17         return h ^ (h >>> 16);
    18     }


     1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
     2             HashEntry<K,V> node = tryLock() ? null :
     3                 scanAndLockForPut(key, hash, value);
     4             V oldValue;
     5             try {
     6                 HashEntry<K,V>[] tab = table;
     7                 int index = (tab.length - 1) & hash;
     8                 HashEntry<K,V> first = entryAt(tab, index);
     9                 for (HashEntry<K,V> e = first;;) {
    10                     if (e != null) {
    11                         K k;
    12                         if ((k = e.key) == key ||
    13                             (e.hash == hash && key.equals(k))) {
    14                             oldValue = e.value;
    15                             if (!onlyIfAbsent) {
    16                                 e.value = value;
    17                                 ++modCount;
    18                             }
    19                             break;
    20                         }
    21                         e = e.next;
    22                     }
    23                     else {
    24                         if (node != null)
    25                             node.setNext(first);
    26                         else
    27                             node = new HashEntry<K,V>(hash, key, value, first);
    28                         int c = count + 1;
    29                         if (c > threshold && tab.length < MAXIMUM_CAPACITY)
    30                             rehash(node);
    31                         else
    32                             setEntryAt(tab, index, node);
    33                         ++modCount;
    34                         count = c;
    35                         oldValue = null;
    36                         break;
    37                     }
    38                 }
    39             } finally {
    40                 unlock();
    41             }
    42             return oldValue;
    43         }


    1. 加锁
    2. 定位key在tabl数组上的索引位置index,获取到头结点
    3. 判断是否有hash冲突
    4. 如果没有冲突直接将新节点node添加到数组index索引位
    5. 如果有冲突,先判断是否有相同key
    6. 有相同key直接替换对应node的value值
    7. 没有添加新元素到链表尾部
    8. 解锁



     1 /**
     2          * Scans for a node containing given key while trying to
     3          * acquire lock, creating and returning one if not found. Upon
     4          * return, guarantees that lock is held. UNlike in most
     5          * methods, calls to method equals are not screened: Since
     6          * traversal speed doesn't matter, we might as well help warm
     7          * up the associated code and accesses as well.
     8          *
     9          * @return a new node if key not found, else null
    10          */
    11         private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    12             HashEntry<K,V> first = entryForHash(this, hash);
    13             HashEntry<K,V> e = first;
    14             HashEntry<K,V> node = null;
    15             int retries = -1; // negative while locating node
    16             while (!tryLock()) {
    17                 HashEntry<K,V> f; // to recheck first below
    18                 if (retries < 0) {
    19                     if (e == null) {
    20                         if (node == null) // speculatively create node
    21                             node = new HashEntry<K,V>(hash, key, value, null);
    22                         retries = 0;
    23                     }
    24                     else if (key.equals(e.key))
    25                         retries = 0;
    26                     else
    27                         e = e.next;
    28                 }
    29                 else if (++retries > MAX_SCAN_RETRIES) {
    30                     lock();
    31                     break;
    32                 }
    33                 else if ((retries & 1) == 0 &&
    34                          (f = entryForHash(this, hash)) != first) {
    35                     e = first = f; // re-traverse if entry changed
    36                     retries = -1;
    37                 }
    38             }
    39             return node;
    40         }


    1. 定位key在HashEntry数组的索引位,并获取第一个节点
    2. 尝试获取锁,如果成功直接返回,否则进入自旋
    3. 判断是否有hash冲突,没有就直接完成新节点的初始化
    4. 有hash冲突,开始遍历链表查找是否有相同key
    5. 如果没找到相同key,那么就完成新节点的初始化
    6. 如果找到相同key,判断循环次数是否大于最大扫描次数
    7. 如果循环次数是否大于最大扫描次数,就直接CAS拿锁(阻塞式)
    8. 如果循环次数不大于最大扫描次数,判断头结点是否有变化
    9. 进入下次循环。

    上面的Segment.rehash() 扩容方法就是segment的put方法第30行代码,Segment.rehash() 的源码是:

     1  for (HashEntry<K,V> last = next;
     2                      last != null;
     3                      last = last.next) {
     4                     int k = last.hash & sizeMask;
     5                     if (k != lastIdx) {
     6                         lastIdx = k;
     7                         lastRun = last;
     8                     }
     9                 }
    10                 newTable[lastIdx] = lastRun;
    11                 // Clone remaining nodes
    12                 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
    13                     V v = p.value;
    14                     int h = p.hash;
    15                     int k = h & sizeMask;
    16                     HashEntry<K,V> n = newTable[k];
    17                     newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
    18                 }
    19             }
    20         }
    21     }
    22     // 将新的节点加到对应索引位
    23     int nodeIndex = node.hash & sizeMask; // add the new node
    24     node.setNext(newTable[nodeIndex]);
    25     newTable[nodeIndex] = node;
    26     table = newTable;
    27 }


    1. 新建扩容后的数组,容量是原来的两倍
    2. 遍历扩容前的数组
    3. 通过e.hash & sizeMask;计算key新的索引位
    4. 转移数据
    5. 将扩容后的数组指向成员变量table






     1 public V get(Object key) {
     2     Segment<K,V> s; // manually integrate access methods to reduce overhead
     3     HashEntry<K,V>[] tab;
     4     int h = hash(key);
     5     // 计算出Segment的索引位
     6     long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
     7     // 以原子的方式获取Segment
     8     if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
     9         (tab = s.table) != null) {
    10         // 原子方式获取HashEntry
    11         for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    12                  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    13              e != null; e = e.next) {
    14             K k;
    15             // key相同
    16             if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    17                 // value是volatile所以可以不加锁直接取值返回
    18                 return e.value;
    19         }
    20     }
    21     return null;
    22 }



     1 public int size() {
     2     // Try a few times to get accurate count. On failure due to
     3     // continuous async changes in table, resort to locking.
     4     final Segment<K,V>[] segments = this.segments;
     5     int size;
     6     // true表示size溢出32位(大于Integer.MAX_VALUE)
     7     boolean overflow; // true if size overflows 32 bits
     8     long sum;         // sum of modCounts
     9     long last = 0L;   // previous sum
    10     int retries = -1; // first iteration isn't retry
    11     try {
    12         for (;;) {
    13             // retries 如果retries等于2则对所有Segment加锁
    14             if (retries++ == RETRIES_BEFORE_LOCK) {
    15                 for (int j = 0; j < segments.length; ++j)
    16                     ensureSegment(j).lock(); // force creation
    17             }
    18             sum = 0L;
    19             size = 0;
    20             overflow = false;
    21             // 统计每个Segment元素个数
    22             for (int j = 0; j < segments.length; ++j) {
    23                 Segment<K,V> seg = segmentAt(segments, j);
    24                 if (seg != null) {
    25                     sum += seg.modCount;
    26                     int c = seg.count;
    27                     if (c < 0 || (size += c) < 0)
    28                         overflow = true;
    29                 }
    30             }
    31             if (sum == last)
    32                 break;
    33             last = sum;
    34         }
    35     } finally {
    36         // 解锁
    37         if (retries > RETRIES_BEFORE_LOCK) {
    38             for (int j = 0; j < segments.length; ++j)
    39                 segmentAt(segments, j).unlock();
    40         }
    41     }
    42     //
    43     return overflow ? Integer.MAX_VALUE : size;
    44 }



    isEmpty() 方法

     1 public boolean isEmpty() {
     2     long sum = 0L;
     3     final Segment<K,V>[] segments = this.segments;
     4     for (int j = 0; j < segments.length; ++j) {
     5         Segment<K,V> seg = segmentAt(segments, j);
     6         if (seg != null) {
     7             // 只要有一个Segment的元素个数不为0则表示不为null
     8             if (seg.count != 0)
     9                 return false;
    10             // 统计操作总数
    11             sum += seg.modCount;
    12         }
    13     }
    14     if (sum != 0L) { // recheck unless no modifications
    15         for (int j = 0; j < segments.length; ++j) {
    16             Segment<K,V> seg = segmentAt(segments, j);
    17             if (seg != null) {
    18                 if (seg.count != 0)
    19                     return false;
    20                 sum -= seg.modCount;
    21             }
    22         }
    23         // 说明在统计过程中ConcurrentHashMap又被操作过,
    24         // 因为上面判断了ConcurrentHashMap不可能会有元素,所以这里如果有操作一定是新增节点
    25         if (sum != 0L)
    26             return false;
    27     }
    28     return true;
    29 }
    1. 先判断Segment里面是否有元素,如果有直接返回,如果没有则统计操作总数;
    2. 为了保证在统计过程中ConcurrentHashMap里面的元素没有发生变化,再对所有的Segment的操作数做了统计;
    3. 最后 sum==0 表示ConcurrentHashMap里面确实没有元素返回true,否则一定进行过新增元素返回false。



    ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。

    ConcurrentHashMap 的高并发性主要来自于三个方面:


    用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求;

    通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性;





