zoukankan      html  css  js  c++  java
  • Java并发容器--ConcurrentHashMap

    引子

      1、不安全:大家都知道HashMap不是线程安全的,在多线程环境下,对HashMap进行put操作会导致死循环。是因为多线程会导致Entry链表形成环形数据结构,这样Entry的next节点将永远不为空,就会产生死循环获取Entry。具体内容见HashMap随笔。

      2、不高效:Collections.synchronizedMap(hashMap)和HashTable的线程安全原理都是对方法进行同步,所有操作竞争同一把锁,性能比较低。

      如何构造一个线程安全且高效的HashMap?ConcurrentHashMap登场。

    锁分段技术

      ConcurrentHashMap将数据分为很多段(Segment),Segment继承了ReentrantLock,每个段都是一把锁。每个Segment都包含一个HashEntry数组,HashEntry数组存放键值对数据。当一个线程要访问Entry数组时,需要获取所在Segment锁,保证在同一个Segment的操作是线程安全的,但其他Segment的数据的访问不受影响,可以实现并发的访问不同的Segment。同一个段中才存在竞争关系,不同的段之间没有竞争关系。

    ConcurrentHashMap源码分析

      源码分析基于jdk1.7,不同版本实现有所不同。

      类图

      

      初始化

        segmentShift和segmentMask的作用是定位Segment索引。以默认值为例,concurrencyLevel为16,需要移位4次(sshift为4),segmentShift就等于28,segmentMask等于15。

        concurrencyLevel是指并发级别,即Segment数组的大小。concurrencyLevel值得设定应该根据并发线程数决定。如果并发级别设置的太小,同一个Segment的元素数量过多,会引起锁竞争的加重;如果太大,原本属于同一个Segment的元素会被分配到不同的Segment,会引起Cpu缓存命中率下降,进而导致程序性能下降。

     1             //initialCapacity:初始容量,默认16。
     2             //loadFactor:负载因子,默认0.75。当元素个数大于loadFactor*最大容量时需要扩容(rehash)
     3             //concurrencyLevel:并发级别,默认16。确定Segment的个数,Segment的个数为大于等于concurrencyLevel的第一个2^n。
     4             public ConcurrentHashMap(int initialCapacity,
     5                              float loadFactor, int concurrencyLevel) {
     6                 //判断参数是否合法
     7                 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
     8                     throw new IllegalArgumentException();
     9                 //Segment最大个数MAX_SEGMENTS = 1 << 16,即65536;
    10                 if (concurrencyLevel > MAX_SEGMENTS)
    11                     concurrencyLevel = MAX_SEGMENTS;
    12                 
    13                 // Find power-of-two sizes best matching arguments
    14                 int sshift = 0;
    15                 int ssize = 1;
    16                 //使用循环找到大于等于concurrencyLevel的第一个2^n。ssize就表示Segment的个数。
    17                 while (ssize < concurrencyLevel) {
    18                     ++sshift;    //记录移位的次数,
    19                     ssize <<= 1;//左移1位
    20                 }
    21                 this.segmentShift = 32 - sshift;    //用于定位hash运算的位数,之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
    22                 this.segmentMask = ssize - 1;        //hash运算的掩码,ssize为2^n,所以segmentMask每一位都为1。目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。
    23                 //最大容量MAXIMUM_CAPACITY = 1 << 30;
    24                 if (initialCapacity > MAXIMUM_CAPACITY)
    25                     initialCapacity = MAXIMUM_CAPACITY;
    26                 //计算每个Segment所需的大小,向上取整
    27                 int c = initialCapacity / ssize;
    28                 if (c * ssize < initialCapacity)
    29                     ++c;
    30                 int cap = MIN_SEGMENT_TABLE_CAPACITY;//每个Segment最小容量MIN_SEGMENT_TABLE_CAPACITY = 2;
    31                 //cap表示每个Segment的容量,也是大于等于c的2^n。
    32                 while (cap < c)
    33                     cap <<= 1;
    34                 //创建一个Segment实例,作为Segment数组ss的第一个元素
    35                 // create segments and segments[0]
    36                 Segment<K,V> s0 =
    37                     new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    38                                      (HashEntry<K,V>[])new HashEntry[cap]);
    39                 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    40                 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    41                 this.segments = ss;
    42             }
    View Code

        

      插入元素(put)

        可以分为三步:

          1、定位Segment:通过Hash值与segmentShift、segmentMask的计算定位到对应的Segment;

          2、锁获取:获取对应Segment的锁,如果获取锁失败,需要自旋重新获取锁;如果自旋超过最大重试次数,则阻塞。

          3、插入元素:如果key已经存在,直接更新;如果key不存在,先判断是否需要扩容,若需要则执行rehash()后插入原因,否则直接存入元素。

        为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

        Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

        与HashMap不同ConcurrentHashMap并不允许key或者value为null。

      1             /**ConcurrentHashMap中方法**/
      2             public V put(K key, V value) {
      3                 Segment<K,V> s;
      4                 if (value == null)
      5                     throw new NullPointerException();
      6                 int hash = hash(key);    //计算hash值,hash值是一个32位的整数
      7                 //计算Segment索引
      8                 //在默认情况下,concurrencyLevel为16,segmentShift为28,segmentMask为15。
      9                 //先右移28位,hash值变为0000 0000 0000 0000 0000 0000 0000 xxxx,
     10                 //与segmentMask做&运算,就是取最后四位的值。这个值就是Segment的索引
     11                 int j = (hash >>> segmentShift) & segmentMask; 
     12                 //通过UNSAFE的方式获取索引j对应的Segment对象。
     13                 if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
     14                      (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
     15                     //Segment采用延迟初始化机制,如果sement为null,则调用ensureSegment创建Segment
     16                     s = ensureSegment(j);
     17                 //向Segment中put元素
     18                 return s.put(key, hash, value, false);
     19             }
     20 
     21             /**ConcurrentHashMap$Segment中方法**/
     22             //向Segment中put元素
     23             final V put(K key, int hash, V value, boolean onlyIfAbsent) {
     24                 //获取锁。如果获取锁成功,插入元素,和普通的hashMap差不多。
     25                 //如果获取锁失败,执行scanAndLockForPut进行重试。重试设计见scanAndLockForPut方法源码。
     26                 HashEntry<K,V> node = tryLock() ? null :
     27                     scanAndLockForPut(key, hash, value);
     28                 V oldValue;
     29                 try {
     30                     HashEntry<K,V>[] tab = table;
     31                     int index = (tab.length - 1) & hash;//计算HashEntry数组索引
     32                     HashEntry<K,V> first = entryAt(tab, index);
     33                     for (HashEntry<K,V> e = first;;) {
     34                         if (e != null) {    //该索引处已经有元素
     35                             K k;
     36 
     37                             //如果key相同,替换value。
     38                             if ((k = e.key) == key ||
     39                                 (e.hash == hash && key.equals(k))) {
     40                                 oldValue = e.value;
     41                                 //onlyIfAbsent=true参数表示如果key存在,则不更新value值,只有在key不存在的情况下,才更新。
     42                                 //在putIfAbsent方法中onlyIfAbsent=true
     43                                 //在put方法中onlyIfAbsent=false
     44                                 if (!onlyIfAbsent) {Scans
     45                                     e.value = value;
     46                                     ++modCount;//修改次数
     47                                 }
     48                                 break;
     49                             }
     50                             e = e.next;//继续找下一个元素
     51                         }
     52                         else {    
     53                             if (node != null)
     54                                 node.setNext(first);
     55                             else
     56                                 node = new HashEntry<K,V>(hash, key, value, first);
     57                             int c = count + 1;    //count为ConcurrentHashMap$Segment中的域
     58                             if (c > threshold && tab.length < MAXIMUM_CAPACITY)
     59                                 //如果元素数量超过阈值且表长度小于MAXIMUM_CAPACITY,扩容
     60                                 rehash(node);
     61                             else
     62                                 setEntryAt(tab, index, node);//将node节点更新到table中
     63                             ++modCount;
     64                             count = c;
     65                             oldValue = null;
     66                             break;
     67                         }
     68                     }
     69                 } finally {
     70                     unlock();
     71                 }
     72                 return oldValue;
     73             }
     74             
     75             /**ConcurrentHashMap$Segment中方法**/
     76             //自旋获取锁
     77             private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
     78                 //entryForHash根据hash值找到当前segment中对应的HashEntry数组索引。
     79                 HashEntry<K,V> first = entryForHash(this, hash);
     80                 HashEntry<K,V> e = first;
     81                 HashEntry<K,V> node = null;
     82                 int retries = -1; // negative while locating node
     83                 //自旋获取锁。若获取到锁,则跳出循环;否则一直循环直到获取到锁或retries大于MAX_SCAN_RETRIES。
     84                 while (!tryLock()) {
     85                     HashEntry<K,V> f; // to recheck first below
     86                     //当retries = -1时(即第一次循环或更新操作导致的first节点发生变化),会遍历该Segment的HashEntry数组中hash对应的链表,如果key对应的HashEntry不存在,则创建该节点。
     87                     //此处遍历链表的原因:希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。怎么理解呢?放在put时再去遍历不行吗?因为此时当前线程没有获取到Segment锁,所以不能进行put操作,但可以为put操作做一些准备工作(有可能加载到缓存),使put的操作更快,从而减少锁竞争。这种思想在remove()方法中也有体现。
     88                     if (retries < 0) {
     89                         if (e == null) {
     90                             //如果key不存在创建node,然后进入下一个循环
     91                             if (node == null) // speculatively create node
     92                                 node = new HashEntry<K,V>(hash, key, value, null);
     93                             retries = 0;
     94                         }
     95                         else if (key.equals(e.key))
     96                             //如果key存在直接进入下一个循环
     97                             retries = 0;
     98                         else
     99                             e = e.next;    //链表的下一个节点
    100                     }
    101                     else if (++retries > MAX_SCAN_RETRIES) {
    102                         //每次循环,retries加1,判断是否大于最大重试次数MAX_SCAN_RETRIES.
    103                         //static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    104                         //为了防止自旋锁大量消耗CPU的缺点。如果超过MAX_SCAN_RETRIES,使用lock方法获取锁。如果获取不到锁则当前线程阻塞并跳出循环。
    105                         //ReentrantLock的lock()和tryLock()方法的区别。
    106                         lock();
    107                         break;
    108                     }
    109                     else if ((retries & 1) == 0 &&
    110                              (f = entryForHash(this, hash)) != first) {
    111                         //每隔一次循环,检查所在数组索引的链表头结点有没有变化(其他线程有更新Map的操作,如put,rehash或者remove操作)。
    112                         //如果改变,retries更新为-1,重新遍历
    113                         e = first = f; // re-traverse if entry changed
    114                         retries = -1;
    115                     }
    116                 }
    117                 return node;
    118             }
    119 
    120             /**ConcurrentHashMap$Segment中方法**/
    121             //rehash
    122             private void rehash(HashEntry<K,V> node) {
    123                 HashEntry<K,V>[] oldTable = table;
    124                 int oldCapacity = oldTable.length;
    125                 int newCapacity = oldCapacity << 1;    //新容量为旧容量的2倍
    126                 threshold = (int)(newCapacity * loadFactor);    //新阈值
    127                 HashEntry<K,V>[] newTable =
    128                     (HashEntry<K,V>[]) new HashEntry[newCapacity];    //新表
    129                 int sizeMask = newCapacity - 1;    //新掩码
    130                 //对旧表做遍历
    131                 for (int i = 0; i < oldCapacity ; i++) {
    132                     HashEntry<K,V> e = oldTable[i];
    133                     if (e != null) {
    134                         HashEntry<K,V> next = e.next;
    135                         int idx = e.hash & sizeMask;
    136                         if (next == null)   //  Single node on list 链表中只存在一个节点
    137                             newTable[idx] = e;
    138                         else { // Reuse consecutive sequence at same slot
    139                             //链表中存在多个节点.
    140                             /*
    141                             相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可
    142                             */
    143                             HashEntry<K,V> lastRun = e;
    144                             int lastIdx = idx;
    145                             //找到第一个在扩容后index都保持不变的节点lastRun
    146                             for (HashEntry<K,V> last = next;
    147                                  last != null;
    148                                  last = last.next) {
    149                                 int k = last.hash & sizeMask;
    150                                 if (k != lastIdx) {
    151                                     lastIdx = k;
    152                                     lastRun = last;
    153                                 }
    154                             }
    155                             newTable[lastIdx] = lastRun;
    156                             // Clone remaining nodes 
    157                             //将这个节点之前的所有节点重排
    158                             for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
    159                                 V v = p.value;
    160                                 int h = p.hash;
    161                                 int k = h & sizeMask;
    162                                 HashEntry<K,V> n = newTable[k];
    163                                 newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
    164                             }
    165                         }
    166                     }
    167                 }
    168                 int nodeIndex = node.hash & sizeMask; // add the new node
    169                 node.setNext(newTable[nodeIndex]);
    170                 newTable[nodeIndex] = node;
    171                 table = newTable;
    172             }
    View Code

        

        Segment延迟初始化机制

          Segment采用延迟初始化机制,如果sement为null,则调用ensureSegment确保创建Segment。

          ensureSegment方法可能被多个线程调用,ensureSegment()是怎么保证线程安全的呢?

          通过源代码可看出ensureSegment方法并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。

          ensureSegment()源代码:

     1                 @SuppressWarnings("unchecked")
     2                 private Segment<K,V> ensureSegment(int k) {
     3                     final Segment<K,V>[] ss = this.segments;
     4                     long u = (k << SSHIFT) + SBASE; // raw offset
     5                     Segment<K,V> seg;
     6                     if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
     7                         //使用第一个segment作为模板来创建segment,第一个segment在Map初始化时已经被创建
     8                         Segment<K,V> proto = ss[0]; // use segment 0 as prototype
     9                         int cap = proto.table.length;
    10                         float lf = proto.loadFactor;
    11                         int threshold = (int)(cap * lf);
    12                         HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    13                         if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    14                             == null) { // recheck
    15                             Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);    //根据第一个segment的参数创建新的Segment
    16                             //自旋CAS。如果seg!=null,说明该segment已经被其他线程创建,则方法结束;如果seg==null,说明该segment还没有被创建,则当前线程采用CAS更新Segment数组,如果CAS成功,则结束,否则说明其他线程对Segment数组有过更新,继续下一个循环指定该segment创建成功。
    17                             while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    18                                    == null) {
    19                                 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
    20                                     break;
    21                             }
    22                         }
    23                     }
    24                     return seg;
    25                 }
    View Code

        

        scanAndLockForPut方法

          自旋获取锁中,当第一次循环或更新操作导致的first节点发生变化时,会遍历该Segment的HashEntry数组中hash对应的链表,如果key对应的HashEntry不存在,则创建该节点。

          此处遍历链表的原因:希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。怎么理解呢?put还是要再去遍历一次(即使链表在缓存中)?因为此时当前线程没有获取到Segment锁,所以不能进行put操作,但可以为put操作做一些准备工作(有可能加载到缓存,在缓存中执行遍历更快),使put的操作更快,从而减少锁竞争。这种思想在remove()方法中也有体现。

      

      获取元素(get)

        get操作不需要加锁,当拿到的值为空时才会加锁重读。读操作不用加锁的原因是它的get方法里将要使用的共享变量都定义成volatile类型,如volatile V value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值)。get方法使用UNSAFE提供的原子读语义来获的Segmnet和对应的链表。

        containsKey方法和get相似,都不用加锁。

     1             public V get(Object key) {
     2                 Segment<K,V> s; // manually integrate access methods to reduce overhead
     3                 HashEntry<K,V>[] tab;
     4                 int h = hash(key);
     5                 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
     6                 //通过Hash值找到相应的Segment
     7                 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
     8                     (tab = s.table) != null) {
     9                     //找到HashEntry链表的索引,遍历链表找到对应的key
    10                     for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    11                              (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    12                          e != null; e = e.next) {
    13                         K k;
    14                         if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    15                             return e.value;
    16                     }
    17                 }
    18                 return null;
    19             }
    View Code

      统计大小(size)

        统计Map的大小需要统计所有Segment的大小然后求和。

        问题:累加的过程中Segment的大小可能会发生变化,导致统计的结果不准确。

        解决方案:1)简单的方法就是对所有的Segment加锁,但方法低效。

             2)考虑到累加的过程中Segment的大小变化的可能性很小,作者给出了更高效的方案,首先尝试几次在不对Segment加锁的情况下统计各个Segment的大小,如果累加期间Map的大小发生了变化,再使用加锁的方式统计各个Segment的大小。判断Map的大小是否发生了变化,需要通过Segment的modCount变量实现。modCount表示对Segment的修改次数。相同的思想也用在了containsValue操作。

        注意事项:使用加锁方式进行统计大小时,对每一个Segment加锁,需要强制创建所有的Segment,这么做的目的是防止其他线程创建Segment并进行更新操作。所以应尽量避免在多线程环境下使用size和containsValue方法。

     1             public int size() {
     2                 // Try a few times to get accurate count. On failure due to
     3                 // continuous async changes in table, resort to locking.
     4                 final Segment<K,V>[] segments = this.segments;
     5                 int size;
     6                 boolean overflow; // true if size overflows 32 bits
     7                 long sum;         // sum of modCounts
     8                 long last = 0L;   // previous sum
     9                 int retries = -1; // first iteration isn't retry
    10                 try {
    11                     for (;;) {
    12                         //static final int RETRIES_BEFORE_LOCK = 2;
    13                         //判断是否到达无锁统计map大小的最大次数,若达到最大次数需要锁所有Segment
    14                         if (retries++ == RETRIES_BEFORE_LOCK) {
    15                             //对每一个Segment加锁,此时需要强制创建所有的Segment,这么做的目的是防止其他线程创建Segment并进行更新操作。
    16                             //所以应避免在多线程环境下使用size和containsValue方法。
    17                             for (int j = 0; j < segments.length; ++j)
    18                                 ensureSegment(j).lock(); // force creation
    19                         }
    20                         sum = 0L;
    21                         size = 0;
    22                         overflow = false;
    23                         for (int j = 0; j < segments.length; ++j) {
    24                             Segment<K,V> seg = segmentAt(segments, j);
    25                             if (seg != null) {
    26                                 sum += seg.modCount;
    27                                 int c = seg.count;
    28                                 if (c < 0 || (size += c) < 0)
    29                                     overflow = true;
    30                             }
    31                         }
    32                         //判断前后两次统计的modCount之和是否相等,若相等则说明没有被修改郭
    33                         //由于last初始值为0,如果该Map从创建到现在都没有被修改过,即所有Segment的modCount都为0,则只执行一次循环;否则至少执行两次循环,比较两次统计的sum有没有发生变化。又因为retries初始值-1,所以可以说重试无锁统计大小的次数为3次。
    34                         if (sum == last)
    35                             break;
    36                         last = sum;
    37                     }
    38                 } finally {
    39                     //重试次数大于最大次数,需要释放锁
    40                     if (retries > RETRIES_BEFORE_LOCK) {
    41                         for (int j = 0; j < segments.length; ++j)
    42                             segmentAt(segments, j).unlock();
    43                     }
    44                 }
    45                 return overflow ? Integer.MAX_VALUE : size;
    46             }
    View Code

     Java8的ConcurrentHashMap

      相对于Java7中的实现,主要有以下两点改进:

        1)取消segment分段,直接使用数组transient volatile Node<K,V>[] table存储数据,将table数组元素作为锁,实现对数组中每一个桶进行加锁,进一步减少并发冲突的概率。

        2)类似于Java8中的HashMap,将数组+链表的结构变更为数组+链表+红黑树的结构。当链表的长度大于8时,将链表转换为红黑树,原因见HashMap。

      通过 Node + CAS + Synchronized 来保证线程安全。

      Fields

      

     1     transient volatile Node<K,V>[] table;//存放元素的数组,懒加载,大小是2的n次方
     2     private transient volatile Node<K,V>[] nextTable;//扩容时用到
     3     //基本计数器,通过CAS更新
     4     private transient volatile long baseCount;
     5     /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
     6      *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在进行扩容
     7      *当为0时:代表当时的table还没有被初始化
     8      *当为正数时:表示初始化或者下一次进行扩容的大小
     9      */
    10     private transient volatile int sizeCtl;
    11 
    12     /**
    13      * The next table index (plus one) to split while resizing.
    14      */
    15     private transient volatile int transferIndex;
    16 
    17     /**
    18      * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
    19      */
    20     private transient volatile int cellsBusy;
    21 
    22     /**
    23      * Table of counter cells. When non-null, size is a power of 2.
    24      */
    25     private transient volatile CounterCell[] counterCells;
    26 
    27     // views
    28     private transient KeySetView<K,V> keySet;
    29     private transient ValuesView<K,V> values;
    30     private transient EntrySetView<K,V> entrySet;
    View Code

      Node

        hash值和key都是final的,不可更改;val和next都是volatile的保证可见性和禁止指令重排序。

     1     static class Node<K,V> implements Map.Entry<K,V> {
     2         //hash值和key都是final的,不可更改;val和next都是volatile的保证可见性和禁止指令重排序。
     3         final int hash;
     4         final K key;
     5         volatile V val;
     6         volatile Node<K,V> next;
     7 
     8         Node(int hash, K key, V val, Node<K,V> next) {
     9             this.hash = hash;
    10             this.key = key;
    11             this.val = val;
    12             this.next = next;
    13         }
    14 
    15         public final K getKey()       { return key; }
    16         public final V getValue()     { return val; }
    17         public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    18         public final String toString(){ return key + "=" + val; }
    19         //不允许更改值
    20         public final V setValue(V value) {
    21             throw new UnsupportedOperationException();
    22         }
    23         public final boolean equals(Object o) {
    24             Object k, v, u; Map.Entry<?,?> e;
    25             return ((o instanceof Map.Entry) &&
    26                     (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
    27                     (v = e.getValue()) != null &&
    28                     (k == key || k.equals(key)) &&
    29                     (v == (u = val) || v.equals(u)));
    30         }
    31         //用于map中的get()方法,子类重写
    32         Node<K,V> find(int h, Object k) {
    33             Node<K,V> e = this;
    34             if (k != null) {
    35                 do {
    36                     K ek;
    37                     if (e.hash == h &&
    38                         ((ek = e.key) == k || (ek != null && k.equals(ek))))
    39                         return e;
    40                 } while ((e = e.next) != null);
    41             }
    42             return null;
    43         }
    44     }
    View Code

      put

        添加元素的大致过程如下:

          1)如果table没有初始化,先通过initTable()方法进行初始化;

          2)计算hash值,找到对应的桶,如果该桶的首节点f为null(即不存在hash冲突),使用CAS直接将新Node放入该桶;

          3)如果首节点f的hash值为MOVED,说明正在扩容,先进行扩容;

          4)如果存在hash冲突,则通过加锁(获取首节点f的监视器锁)来保证线程安全,分两种情况:链表和红黑树;如果是链表,则遍历链表,存在相同的key就进行覆盖,否则插入到链表的尾部;如果是红黑树,则向红黑树中插入新节点;

          5)判断是否需要将链表转化为红黑树,如果需要,调用treeifyBin方法;

          6)如果添加成功,就调用addCount方法统计size,并检查是否需要扩容。

     1     public V put(K key, V value) {
     2         return putVal(key, value, false);
     3     }
     4     
     5     final V putVal(K key, V value, boolean onlyIfAbsent) {
     6         if (key == null || value == null) throw new NullPointerException();
     7         int hash = spread(key.hashCode());//计算hash值
     8         //用于记录相应链表的长度
     9         int binCount = 0;
    10         for (Node<K,V>[] tab = table;;) {
    11             Node<K,V> f; int n, i, fh;
    12             if (tab == null || (n = tab.length) == 0)
    13                 //如果数组为空,则进行初始化
    14                 tab = initTable();
    15             //找到该hash值对应的下标i,得到第一个节点f
    16             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    17                 //如果第一个节点f为null,使用CAS直接将新Node放入该桶
    18                 //如果CAS成功,跳出循环结束;如果失败,进入下一个循环
    19                 if (casTabAt(tab, i, null,
    20                              new Node<K,V>(hash, key, value, null)))
    21                     break;                   // no lock when adding to empty bin
    22             }
    23             //如果f的哈希值为MOVED,则进行数据迁移(扩容)
    24             else if ((fh = f.hash) == MOVED)
    25                 tab = helpTransfer(tab, f);
    26             else {
    27                 //这种情况下,说明f是第一个节点且不为null
    28                 V oldVal = null;
    29                 //获取该桶第一个节点f的监视器锁
    30                 synchronized (f) {
    31                     if (tabAt(tab, i) == f) {
    32                         //第一个节点f的hash值大于0,说明是链表
    33                         if (fh >= 0) {
    34                             binCount = 1;
    35                             //遍历链表,
    36                             for (Node<K,V> e = f;; ++binCount) {
    37                                 K ek;
    38                                 //如果找到同样的key,判断onlyIfAbsent然后进行覆盖,跳出循环
    39                                 if (e.hash == hash &&
    40                                     ((ek = e.key) == key ||
    41                                      (ek != null && key.equals(ek)))) {
    42                                     oldVal = e.val;
    43                                     if (!onlyIfAbsent)
    44                                         e.val = value;
    45                                     break;
    46                                 }
    47                                 Node<K,V> pred = e;
    48                                 //如果遍历到链表尾部没有找到相同的key,则将新Node插入到链表的尾部
    49                                 if ((e = e.next) == null) {
    50                                     pred.next = new Node<K,V>(hash, key,
    51                                                               value, null);
    52                                     break;
    53                                 }
    54                             }
    55                         }
    56                         //如果第一个节点是红黑树节点
    57                         else if (f instanceof TreeBin) {
    58                             Node<K,V> p;
    59                             binCount = 2;
    60                             // 调用红黑树的插值方法插入新节点
    61                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    62                                                            value)) != null) {
    63                                 oldVal = p.val;
    64                                 if (!onlyIfAbsent)
    65                                     p.val = value;
    66                             }
    67                         }
    68                     }
    69                 }
    70                 //binCount != 0说明上面做了链表操作
    71                 if (binCount != 0) {
    72                     //判断是否将链表转化为红黑树,TREEIFY_THRESHOLD为8
    73                     if (binCount >= TREEIFY_THRESHOLD)
    74                         //可能转化为红黑树
    75                         //如果当前数组的长度小于64,那么会选择进行数组扩容,而不是转换为红黑树
    76                         treeifyBin(tab, i);
    77                     if (oldVal != null)
    78                         return oldVal;
    79                     break;
    80                 }
    81             }
    82         }
    83         addCount(1L, binCount);
    84         return null;
    85     }
    View Code

      initTable

        初始化table

     1     private final Node<K,V>[] initTable() {
     2         Node<K,V>[] tab; int sc;
     3         while ((tab = table) == null || tab.length == 0) {
     4             //如果sizeCtl小于0,说明其他线程已经初始化了
     5             if ((sc = sizeCtl) < 0)
     6                 //yield()使线程由运行状态变为就绪状态,把CPU让出来,让自己或者其它的线程运行。
     7                 Thread.yield(); // lost initialization race; just spin
     8             //通过CAS操作将sizeCtl设置为-1,返回true代表抢到锁
     9             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    10                 try {
    11                     if ((tab = table) == null || tab.length == 0) {
    12                         //默认容量DEFAULT_CAPACITY为16
    13                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    14                         //初始化指定容量的数组,并赋给table,table为volatile的
    15                         @SuppressWarnings("unchecked")
    16                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    17                         table = tab = nt;
    18                         //>>>为无符号右移运算,无符号右移2位,相当于除以2
    19                         //即相当于sc=0.75n
    20                         sc = n - (n >>> 2);
    21                     }
    22                 } finally {
    23                     //将sc赋值给sizeCtl
    24                     sizeCtl = sc;
    25                 }
    26                 break;
    27             }
    28         }
    29         return tab;
    30     }
    View Code

      treeifyBin

        链表转红黑树

     1     private final void treeifyBin(Node<K,V>[] tab, int index) {
     2         Node<K,V> b; int n, sc;
     3         if (tab != null) {
     4             //如果数组长度小于MIN_TREEIFY_CAPACITY(64)的时候,进行扩容。
     5             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
     6                 tryPresize(n << 1);
     7             //b是该桶中的第一个节点
     8             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
     9                 //获取锁
    10                 synchronized (b) {
    11                     if (tabAt(tab, index) == b) {
    12                         TreeNode<K,V> hd = null, tl = null;
    13                         //遍历链表,创建一颗红黑树
    14                         for (Node<K,V> e = b; e != null; e = e.next) {
    15                             TreeNode<K,V> p =
    16                                 new TreeNode<K,V>(e.hash, e.key, e.val,
    17                                                   null, null);
    18                             if ((p.prev = tl) == null)
    19                                 hd = p;
    20                             else
    21                                 tl.next = p;
    22                             tl = p;
    23                         }
    24                         //将红黑树设置到数组的相应桶中
    25                         setTabAt(tab, index, new TreeBin<K,V>(hd));
    26                     }
    27                 }
    28             }
    29         }
    30     }
    View Code

      tryPresize

        扩容,每次都是扩容为原来的2倍,size是已经翻完倍的数值。

      1     private final void tryPresize(int size) {
      2         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
      3             tableSizeFor(size + (size >>> 1) + 1);//取大于1.5倍的size+1的最近的2的n次方的值
      4         int sc;
      5         while ((sc = sizeCtl) >= 0) {
      6             Node<K,V>[] tab = table; int n;
      7             //如果数组为空,先初始化数组
      8             if (tab == null || (n = tab.length) == 0) {
      9                 n = (sc > c) ? sc : c;
     10                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
     11                     try {
     12                         if (table == tab) {
     13                             @SuppressWarnings("unchecked")
     14                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
     15                             table = nt;
     16                             sc = n - (n >>> 2);
     17                         }
     18                     } finally {
     19                         sizeCtl = sc;
     20                     }
     21                 }
     22             }
     23             else if (c <= sc || n >= MAXIMUM_CAPACITY)
     24                 break;
     25             else if (tab == table) {
     26                 int rs = resizeStamp(n);
     27                 if (sc < 0) {
     28                     Node<K,V>[] nt;
     29                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
     30                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
     31                         transferIndex <= 0)
     32                         break;
     33                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     34                         transfer(tab, nt);
     35                 }
     36                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
     37                                              (rs << RESIZE_STAMP_SHIFT) + 2))
     38                     transfer(tab, null);
     39             }
     40         }
     41     }
     42 
     43 
     44     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
     45         int n = tab.length, stride;
     46         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
     47             stride = MIN_TRANSFER_STRIDE; // subdivide range
     48         if (nextTab == null) {            // initiating
     49             try {
     50                 @SuppressWarnings("unchecked")
     51                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
     52                 nextTab = nt;
     53             } catch (Throwable ex) {      // try to cope with OOME
     54                 sizeCtl = Integer.MAX_VALUE;
     55                 return;
     56             }
     57             nextTable = nextTab;
     58             transferIndex = n;
     59         }
     60         int nextn = nextTab.length;
     61         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
     62         boolean advance = true;
     63         boolean finishing = false; // to ensure sweep before committing nextTab
     64         for (int i = 0, bound = 0;;) {
     65             Node<K,V> f; int fh;
     66             while (advance) {
     67                 int nextIndex, nextBound;
     68                 if (--i >= bound || finishing)
     69                     advance = false;
     70                 else if ((nextIndex = transferIndex) <= 0) {
     71                     i = -1;
     72                     advance = false;
     73                 }
     74                 else if (U.compareAndSwapInt
     75                          (this, TRANSFERINDEX, nextIndex,
     76                           nextBound = (nextIndex > stride ?
     77                                        nextIndex - stride : 0))) {
     78                     bound = nextBound;
     79                     i = nextIndex - 1;
     80                     advance = false;
     81                 }
     82             }
     83             if (i < 0 || i >= n || i + n >= nextn) {
     84                 int sc;
     85                 if (finishing) {
     86                     nextTable = null;
     87                     table = nextTab;
     88                     sizeCtl = (n << 1) - (n >>> 1);
     89                     return;
     90                 }
     91                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
     92                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
     93                         return;
     94                     finishing = advance = true;
     95                     i = n; // recheck before commit
     96                 }
     97             }
     98             else if ((f = tabAt(tab, i)) == null)
     99                 advance = casTabAt(tab, i, null, fwd);
    100             else if ((fh = f.hash) == MOVED)
    101                 advance = true; // already processed
    102             else {
    103                 synchronized (f) {
    104                     if (tabAt(tab, i) == f) {
    105                         Node<K,V> ln, hn;
    106                         if (fh >= 0) {
    107                             int runBit = fh & n;
    108                             Node<K,V> lastRun = f;
    109                             for (Node<K,V> p = f.next; p != null; p = p.next) {
    110                                 int b = p.hash & n;
    111                                 if (b != runBit) {
    112                                     runBit = b;
    113                                     lastRun = p;
    114                                 }
    115                             }
    116                             if (runBit == 0) {
    117                                 ln = lastRun;
    118                                 hn = null;
    119                             }
    120                             else {
    121                                 hn = lastRun;
    122                                 ln = null;
    123                             }
    124                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
    125                                 int ph = p.hash; K pk = p.key; V pv = p.val;
    126                                 if ((ph & n) == 0)
    127                                     ln = new Node<K,V>(ph, pk, pv, ln);
    128                                 else
    129                                     hn = new Node<K,V>(ph, pk, pv, hn);
    130                             }
    131                             setTabAt(nextTab, i, ln);
    132                             setTabAt(nextTab, i + n, hn);
    133                             setTabAt(tab, i, fwd);
    134                             advance = true;
    135                         }
    136                         else if (f instanceof TreeBin) {
    137                             TreeBin<K,V> t = (TreeBin<K,V>)f;
    138                             TreeNode<K,V> lo = null, loTail = null;
    139                             TreeNode<K,V> hi = null, hiTail = null;
    140                             int lc = 0, hc = 0;
    141                             for (Node<K,V> e = t.first; e != null; e = e.next) {
    142                                 int h = e.hash;
    143                                 TreeNode<K,V> p = new TreeNode<K,V>
    144                                     (h, e.key, e.val, null, null);
    145                                 if ((h & n) == 0) {
    146                                     if ((p.prev = loTail) == null)
    147                                         lo = p;
    148                                     else
    149                                         loTail.next = p;
    150                                     loTail = p;
    151                                     ++lc;
    152                                 }
    153                                 else {
    154                                     if ((p.prev = hiTail) == null)
    155                                         hi = p;
    156                                     else
    157                                         hiTail.next = p;
    158                                     hiTail = p;
    159                                     ++hc;
    160                                 }
    161                             }
    162                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    163                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
    164                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    165                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
    166                             setTabAt(nextTab, i, ln);
    167                             setTabAt(nextTab, i + n, hn);
    168                             setTabAt(tab, i, fwd);
    169                             advance = true;
    170                         }
    171                     }
    172                 }
    173             }
    174         }
    175     }
    View Code

      

      addCount

        在添加完元素之后,调用addCount方法进行计数。

        addCount方法主要完成两个功能:

          1)对table的长度计数+1,有两种情况:一是通过修改 baseCount,二是通过使用 CounterCell。当 CounterCell 被初始化后,就优先使用他,不再使用 baseCount了;

          2)检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

     1     //从putVal传入的参数x是1,参数check为binCount,binCount>=0,默认要检查是否需要扩容
     2     private final void addCount(long x, int check) {
     3         CounterCell[] as; long b, s;
     4         //如果counterCells不为null或者更新baseCount失败
     5         if ((as = counterCells) != null ||
     6             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     7             CounterCell a; long v; int m;
     8             boolean uncontended = true;
     9             //如果counterCells的大小为0,
    10             //或者随机取其中一个元素为null,
    11             //或者修改这个槽位的变量失败,则执行fullAddCount方法
    12             if (as == null || (m = as.length - 1) < 0 ||
    13                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    14                 !(uncontended =
    15                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    16                 fullAddCount(x, uncontended);
    17                 return;
    18             }
    19             if (check <= 1)
    20                 return;
    21             s = sumCount();//计算map的size赋值给s
    22         }
    23         //判断是否需要扩容,在putVal中调用,默认都是要检查的
    24         if (check >= 0) {
    25             Node<K,V>[] tab, nt; int n, sc;
    26             //如果map的size大于sizeCtl(扩容阈值),
    27             //且table不是null,
    28             //且table的长度小于MAXIMUM_CAPACITY,则扩容
    29             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    30                    (n = tab.length) < MAXIMUM_CAPACITY) {
    31                 int rs = resizeStamp(n);
    32                 //sizeCtl小于0表示正在扩容
    33                 if (sc < 0) {
    34                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    35                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    36                         transferIndex <= 0)
    37                         break;
    38                     // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
    39                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    40                         transfer(tab, nt);
    41                 }
    42                 //如果没有在扩容,将 sc 更新为负数,更新成功就进行扩容
    43                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
    44                                              (rs << RESIZE_STAMP_SHIFT) + 2))
    45                     transfer(tab, null);//进行扩容。
    46                 s = sumCount();
    47             }
    48         }
    49     }
    View Code

      get

        获取元素的大致过程如下:

          1)计算hash值,找到数组table中对应的桶;

          2)如果该桶的首节点为null,直接返回null;

          3)如果该桶的首节点的key就是要找的key,直接返回其value;

          4)如果该桶的首节点的hash值<0,说明正在扩容或者该位置是红黑树,通过find方法找到想要的值;

          5)如果是链表,遍历链表查找相同的key。

     1     public V get(Object key) {
     2         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
     3         int h = spread(key.hashCode());//计算hash值
     4         if ((tab = table) != null && (n = tab.length) > 0 &&
     5             (e = tabAt(tab, (n - 1) & h)) != null) {
     6             //如果该桶的第一个节点就是要找的key,直接返回value
     7             if ((eh = e.hash) == h) {
     8                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
     9                     return e.val;
    10             }
    11             //第一个节点的hash值<0,说明正在扩容或者该位置是红黑树
    12             else if (eh < 0)
    13                 return (p = e.find(h, key)) != null ? p.val : null;
    14             //这种情况下,肯定是链表
    15             while ((e = e.next) != null) {
    16                 if (e.hash == h &&
    17                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
    18                     return e.val;
    19             }
    20         }
    21         return null;
    22     }
    View Code

      size

        size = baseCount + CounterCell数组中元素的个数。因为在addCount方法中,使用CAS更新baseCount,有可能在并发情况下更新失败。即节点已经被添加到数组table中,但数量没有被统计。当更新失败时,会调用fullAddCount方法将这些失败的节点包装成一个CounterCell对象,并保存在CounterCell数组中。

     1     public int size() {
     2         long n = sumCount();
     3         return ((n < 0L) ? 0 :
     4                 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
     5                 (int)n);
     6     }
     7 
     8     final long sumCount() {
     9         CounterCell[] as = counterCells; CounterCell a;
    10         long sum = baseCount;
    11         if (as != null) {
    12             for (int i = 0; i < as.length; ++i) {
    13                 if ((a = as[i]) != null)
    14                     sum += a.value;
    15             }
    16         }
    17         return sum;
    18     }
    View Code

    总结

        Java8版本的ConcurrentHashMap相对于Java7有什么优势:

        1)Java7中锁的粒度为segment,每个segment中包含多个HashEntry,而Java8中锁的粒度就是HashEntry(首节点);

        2)Java7中锁使用的是ReentrantLock,而Java8中使用的是synchronized;

          为什么使用内置锁synchronized来代替重入锁ReentrantLock?

          1 在低粒度的加锁方式中,synchronized的性能不比ReentrantLock差;Java8中ConcurrentHashMap的锁粒度更低了,发生冲突的概率更低,JVM对synchronized进行了大量的优化(自旋锁、偏向锁、轻量级锁等等),只要线程在可以在自旋过程中拿到锁,那么就不会升级为重量级锁,就避免了线程挂起和唤醒的上下文开销。但使用ReentrantLock不会自旋,而是直接被挂起,当然,也可以使用tryLock(),但是这样又出现了一个问题,你怎么知道tryLock的时间呢?在时间范围里还好,假如超过了呢?

          所以在低粒度的加锁方式中,synchronized是最好的选择。Synchronized和ReentrantLock他们的开销差距是在释放锁时唤醒线程的数量,Synchronized是唤醒锁池里所有的线程+刚好来访问的线程,而ReentrantLock则是当前线程后进来的第一个线程+刚好来访问的线程。

          2 synchronized内置锁使用起来更加简便、易懂、程序可读性好;

          3 ReentrantLock需要消耗更多的内存

        3)Java8中使用链表+红黑树的数据结构,代替Java7中的链表,当链表长度比较长时,红黑树的查找速度更快;

    参考资料:

      《java并发编程的艺术》

       ConcurrentHashMap原理分析

       ConcurrentHashMap总结

       ConcurrentHashMap 1.8为什么要使用CAS+Synchronized取代Segment+ReentrantLock

       ConcurrentHashMap原理分析(1.7与1.8)

      Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

      Map 综述(三):彻头彻尾理解 ConcurrentHashMap

  • 相关阅读:
    CreateRemoteThread注入DLL
    远程线程注入引出的问题
    jQuery中排除指定元素,同时选择剩下的所有元素
    YUIDoc的使用方法小结
    实验二 栈和队列的应用
    实验一 线性表的基本操作
    最大子段和详解
    HDOJ 1995 汉诺塔V
    错排公式 详细解答
    HDOJ 2212 DFS
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/7709755.html
Copyright © 2011-2022 走看看