zoukankan      html  css  js  c++  java
  • ConcurrentHashMap源码分析

    前言:ConcurrentHashMap是HashMap的线程安全版本,内部使用了数组+链表+红黑树的结构来存储数据,相对于同样线程安全的Hashtable来说,它在效率方面有很大的提升,因此多线程环境下更多的是使用ConcurrentHashMap,因此有必要对其原理进行分析。

    注:本文jdk源码版本为jdk1.8.0_172


    1.ConcurrentHashMap介绍

    ConcurrentHashMap是HashMap的线程安全版本,底层数据结构为数组+链表+红黑树,默认容量16,线程同步,不允许[key,value]为null。

    1 public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    2     implements ConcurrentMap<K,V>, Serializable

    构造函数:

     1 public ConcurrentHashMap() {
     2 }
     3 
     4  public ConcurrentHashMap(int initialCapacity) {
     5     if (initialCapacity < 0)
     6         throw new IllegalArgumentException();
     7     int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
     8                MAXIMUM_CAPACITY :
     9                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    10     this.sizeCtl = cap;
    11 }
    12 
    13   public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    14     this.sizeCtl = DEFAULT_CAPACITY;
    15     putAll(m);
    16 }
    17 
    18  public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    19     this(initialCapacity, loadFactor, 1);
    20 }
    21 
    22     public ConcurrentHashMap(int initialCapacity,
    23                          float loadFactor, int concurrencyLevel) {
    24     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
    25         throw new IllegalArgumentException();
    26     if (initialCapacity < concurrencyLevel)   // Use at least as many bins
    27         initialCapacity = concurrencyLevel;   // as estimated threads
    28     long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    29     int cap = (size >= (long)MAXIMUM_CAPACITY) ?
    30         MAXIMUM_CAPACITY : tableSizeFor((int)size);
    31     this.sizeCtl = cap;
    32 }

    分析:

    通过构造函数可以发现sizeCtl变量经常出现,该变量通过查看jdk源码注释可知该变量主要控制初始化或扩容:

    #1.-1,表示线程正在进行初始化操作。

    #2.-(1+nThreads),表示n个线程正在进行扩容。

    #3.0,默认值,后续在真正初始化的时候使用默认容量。

    #4.>0,初始化或扩容完成后下一次的扩容门槛。

    2.具体源码分析

    put操作:

     1 final V putVal(K key, V value, boolean onlyIfAbsent) {
     2         if (key == null || value == null) throw new NullPointerException();
     3         // 计算key的hash值
     4         int hash = spread(key.hashCode());
     5         // 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
     6         int binCount = 0;
     7         // 进行自旋
     8         for (Node<K,V>[] tab = table;;) {
     9             Node<K,V> f; int n, i, fh;
    10             if (tab == null || (n = tab.length) == 0)
    11                 // table未初始化,则初始化
    12                 tab = initTable();
    13             // 如果该位置上的f为null,则说明第一次插入元素,则直接插入新的Node节点
    14             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    15                 if (casTabAt(tab, i, null,
    16                              new Node<K,V>(hash, key, value, null)))
    17                     break;                   // no lock when adding to empty bin
    18             }
    19             // 如果检测到当前某个节点的hash值为MOVED,则表示正在进行数组扩张的数据复制阶段
    20             // 则当前线程与会参与复制,通过允许多线程复制的功能,减少数组的复制来带来的性能损失
    21             else if ((fh = f.hash) == MOVED)
    22                 tab = helpTransfer(tab, f);
    23             else {
    24                 V oldVal = null;
    25                 /**
    26                  * 到该分支表明该位置上有元素,采用synchronized方式加锁
    27                  * 如果是链表的话,则对链表进行遍历,找到key和key的hash值都一样的节点,进行替换
    28                  * 如果没有找到,则添加在链表最后面
    29                  * 如果是树的话,则添加到树中去
    30                  */
    31                 synchronized (f) {
    32                     // 再次取出要存储的位置元素,跟之前的数据进行比较,看是否进行了更改
    33                     if (tabAt(tab, i) == f) {
    34                         // 链表
    35                         if (fh >= 0) {
    36                             binCount = 1;
    37                             // 遍历链表
    38                             for (Node<K,V> e = f;; ++binCount) {
    39                                 K ek;
    40                                 // 元素的hash、key都相同,则进行替换和hashMap相同
    41                                 if (e.hash == hash &&
    42                                     ((ek = e.key) == key ||
    43                                      (ek != null && key.equals(ek)))) {
    44                                     oldVal = e.val;
    45                                     // 当使用putIfAbsent的时候,只有在这个key没有设置值时的候才设置
    46                                     if (!onlyIfAbsent)
    47                                         e.val = value;
    48                                     break;
    49                                 }
    50                                 Node<K,V> pred = e;
    51                                 // 不同key,hash值相同时,直接添加到链表尾即可
    52                                 if ((e = e.next) == null) {
    53                                     pred.next = new Node<K,V>(hash, key,
    54                                                               value, null);
    55                                     break;
    56                                 }
    57                             }
    58                         }
    59                         // 当前结点为红黑树
    60                         else if (f instanceof TreeBin) {
    61                             Node<K,V> p;
    62                             binCount = 2;
    63                             // 添加元素到树中去,表明树的当前结点存在值,则进行替换
    64                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    65                                                            value)) != null) {
    66                                 oldVal = p.val;
    67                                 if (!onlyIfAbsent)
    68                                     p.val = value;
    69                             }
    70                         }
    71                     }
    72                 }
    73                 if (binCount != 0) {
    74                     // 当在同一个节点的数目大于等于8时,则进行扩容或者将数据转换成红黑树
    75                     // 注意,这里并不一定是直接转换成红黑树,有可能先进行扩容
    76                     if (binCount >= TREEIFY_THRESHOLD)
    77                         treeifyBin(tab, i);
    78                     if (oldVal != null)
    79                         return oldVal;
    80                     break;
    81                 }
    82             }
    83         }
    84         // 计数 binCount大于1(链表的长度)表示链表,binCount=2表示红黑树
    85         addCount(1L, binCount);
    86         return null;
    87     }

    分析:

    通过查看put操作的核心源码,整体逻辑还是比较清晰,有几个点需要注意:

    #1.在插入元素时,采用了自旋。

    #2.在插入元素的时候才会进行初始化。

    #3.在插入元素时,底层数据结构可能会转向红黑树。

    initTable:初始化函数

     1  private final Node<K,V>[] initTable() {
     2         Node<K,V>[] tab; int sc;
     3         while ((tab = table) == null || tab.length == 0) {
     4             // sizeCtl初始值为0,当小于0时,表示在别的线程初始化表或扩展表,当前线程只需要让出cpu时间片即可
     5             if ((sc = sizeCtl) < 0)
     6                 Thread.yield(); // lost initialization race; just spin
     7             // 将sc更新为-1,表示线程正在进行初始化操作
     8             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
     9                 try {
    10                     if ((tab = table) == null || tab.length == 0) {
    11                         // 指定了大小就创建指定大小的Node数组,否则创建默认大小的Node数组
    12                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    13                         @SuppressWarnings("unchecked")
    14                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    15                         table = tab = nt;
    16                         sc = n - (n >>> 2);
    17                     }
    18                 } finally {
    19                     // 和上面逻辑对别可知sizeCtl的大小为数组长度的3/4
    20                     sizeCtl = sc;
    21                 }
    22                 break;
    23             }
    24         }
    25         return tab;
    26     }

    分析:

    在put操作时才进行初始化操作其实是懒加载的一种表现形式,并且初始化时,已考虑多线程的情况,默认容量为16

    当挂在链表上的元素大于等于8时,会通过treeifyBin方法来判断是否扩容或转换为一棵树。

    treeifyBin:

     1 private final void treeifyBin(Node<K,V>[] tab, int index) {
     2         Node<K,V> b; int n, sc;
     3         if (tab != null) {
     4             // 如果数组长度小于64则进行扩容
     5             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
     6                 tryPresize(n << 1); 
     7             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
     8                 // 将链表转换成树
     9                 synchronized (b) {
    10                     // 再次比较当前位置结点是否改变
    11                     if (tabAt(tab, index) == b) {
    12                         TreeNode<K,V> hd = null, tl = null; // hd:树的头(head)
    13                         for (Node<K,V> e = b; e != null; e = e.next) {
    14                             TreeNode<K,V> p =
    15                                 new TreeNode<K,V>(e.hash, e.key, e.val,
    16                                                   null, null);
    17                             // 链表转换成树后,头节点依然在相同位置
    18                             if ((p.prev = tl) == null)
    19                                 hd = p;
    20                             else
    21                                 tl.next = p;
    22                             tl = p;
    23                         }
    24                         setTabAt(tab, index, new TreeBin<K,V>(hd));
    25                     }
    26                 }
    27             }
    28         }
    29     }

    分析:

    从上述源码上看,当节点链表上的元素大于等于8时,并不是一定要将数据结构转换成树。而是要先判断数组的容量,如果数组长度小于64,会进行扩容(扩容为原来数组长度的一倍),否则才会转换成树。

    tryPresize:扩容函数,注意通过treeifyBin调用tryPresize时,入参已经扩大2倍

     1    /**
     2      * 扩容时大小总是2的N次方
     3      * 扩容这里可能有一点绕,用一个例子来走下流程
     4      * 假设原来数组长度为16(默认值),在调用tryPresize的时候size的值已经变成了32(16<<1),此时sizeCtl为12
     5      * 计算出c的值为64,注意扩容会在transfer中进行(前提数组已经初始化),每次扩大2倍,由于数组长度基数为2的N次方,所以最终的数组长度也是2的N次方。
     6      * 注意c的值是用来控制循环退出的,条件c<=sc(sizeCtl)。
     7      *           数组长度   sizeCtl
     8      *第一次扩容:  32        28
     9      *第二次扩容:  64        48
    10      *第三次扩容:  128       96   此时c(64)<sc(96) 此时退出扩容
    11      */
    12 private final void tryPresize(int size) {
    13         // 通过tableSizeFor计算扩容退出控制量标志,容量大小总是2的N次方
    14         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
    15             tableSizeFor(size + (size >>> 1) + 1);
    16         int sc;
    17         while ((sc = sizeCtl) >= 0) {
    18             Node<K,V>[] tab = table; int n;
    19             // 初始化
    20             // 如果tab未初始化,则初始化一个大小为sizeCtl和c中较大的数组
    21             // 初始化是将sizeCtl设置为-1,完成之后将其设置为数组长度的3/4
    22             // 在此进行初始化,主要是因为如果直接调用putAll方法进行元素添加时,table还未初始化,所以这里需要判断table是否进行了初始化
    23             if (tab == null || (n = tab.length) == 0) {
    24                 n = (sc > c) ? sc : c;
    25                 // 初始化tab的时候,把sizeCtl设置为-1,通过CAS
    26                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    27                     try {
    28                         if (table == tab) {
    29                             @SuppressWarnings("unchecked")
    30                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    31                             table = nt;
    32                             sc = n - (n >>> 2);
    33                         }
    34                     } finally {
    35                         sizeCtl = sc;
    36                     }
    37                 }
    38             }
    39             // 一直扩容到c小于等于sizeCtl或者数组长度大于最大长度的时候,退出扩容
    40             else if (c <= sc || n >= MAXIMUM_CAPACITY)
    41                 break;
    42             else if (tab == table) {
    43                 int rs = resizeStamp(n);
    44                 // 如果正在扩容,则帮助扩容
    45                 // 否则的话,开始新的扩容
    46                 // 在transfer操作,将第一个参数的table元素,移到第二个元素的table去,
    47                 // 虽然此时第二个参数设置的是null,但是在transfer方法中,第二个参数为null的时候,会创建一个两倍大小的table
    48                 // sc小于0表示有线程在进行操作
    49                 if (sc < 0) {
    50                     Node<K,V>[] nt;
    51                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    52                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    53                         transferIndex <= 0)
    54                         break;
    55                     // 将线程数加一,该线程将进行transfer,在transfer的时候,sc表示transfer工作线程数
    56                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    57                         transfer(tab, nt);
    58                 }
    59                 // 没有初始化或扩容,直接进行扩容
    60                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
    61                                              (rs << RESIZE_STAMP_SHIFT) + 2))
    62                     transfer(tab, null);
    63             }
    64         }
    65     }

    分析:

    扩容时稍微有一点绕,但上面注释给出了一个例子,理解该例子应该就可以理解扩容,特别要注意源码中的c值,可以看做是扩容控制值,通过该值来终止扩容函数。

    transfer:数组扩容函数

      1  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
      2         int n = tab.length, stride;
      3         // 确定线程负责数组大小的范围
      4         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
      5             stride = MIN_TRANSFER_STRIDE; // subdivide range
      6         // 扩容后数组长度为原来的两倍
      7         if (nextTab == null) {            // initiating
      8             try {
      9                 @SuppressWarnings("unchecked")
     10                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
     11                 nextTab = nt;
     12             } catch (Throwable ex) {      // try to cope with OOME
     13                 sizeCtl = Integer.MAX_VALUE;
     14                 return;
     15             }
     16             nextTable = nextTab;
     17             transferIndex = n;
     18         }
     19         int nextn = nextTab.length;
     20         /**
     21          * 创建一个fwd结点,用来控制并发,当一个结点为空或者已经被转移之后,就设置为fwd结点
     22          * 这是一个空的标志节点
     23          */
     24         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
     25         // 是否继续向前查找的标志位
     26         boolean advance = true;
     27         boolean finishing = false; // to ensure sweep before committing nextTab
     28         for (int i = 0, bound = 0;;) {
     29             Node<K,V> f; int fh;
     30             while (advance) {
     31                 int nextIndex, nextBound;
     32                 if (--i >= bound || finishing)
     33                     advance = false;
     34                 else if ((nextIndex = transferIndex) <= 0) {
     35                     i = -1;
     36                     advance = false;
     37                 }
     38                 else if (U.compareAndSwapInt
     39                          (this, TRANSFERINDEX, nextIndex,
     40                           nextBound = (nextIndex > stride ?
     41                                        nextIndex - stride : 0))) {
     42                     bound = nextBound;
     43                     i = nextIndex - 1;
     44                     advance = false;
     45                 }
     46             }
     47             if (i < 0 || i >= n || i + n >= nextn) {
     48                 int sc;
     49                 // 数据迁移完成,替换旧桶数据
     50                 if (finishing) {
     51                     nextTable = null;
     52                     table = nextTab;
     53                     // 设置sizeCtl为扩容后的0.75
     54                     sizeCtl = (n << 1) - (n >>> 1);
     55                     return;
     56                 }
     57                 // 扩容完成,将扩容线程数-1
     58                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
     59                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
     60                         return;
     61                     // finishing和advance设置为true,重新走到上面if条件,再次检查是否迁移完
     62                     // 通过fh=f.hash==MOVED进行判断
     63                     finishing = advance = true;
     64                     i = n; // recheck before commit
     65                 }
     66             }
     67             // 如果桶中无数据,则放入fwd标记,表示该位置已迁移
     68             else if ((f = tabAt(tab, i)) == null)
     69                 advance = casTabAt(tab, i, null, fwd);
     70             // 如果桶中第一个元素的hash值为MOVED,说明该节点为fwd节点,详情看fwd节点的构造函数
     71             // 说明该位置已经被迁移
     72             else if ((fh = f.hash) == MOVED)
     73                 advance = true; // already processed
     74             else {
     75                 // 加锁迁移元素
     76                 synchronized (f) {
     77                     // 再次判断桶中第一个元素是否有过修改
     78                     if (tabAt(tab, i) == f) {
     79                         /**
     80                          * 把一个链表划分成两个链表
     81                          * 规则是桶中各元素的hash值与桶大小n进行与操作
     82                          * 等于0的放到低位链表(low)中,等于1的放到高位链表(high)中
     83                          * 其中低位链表迁移到新桶的位置是相对旧桶不变的
     84                          * 高位链表迁移到新桶的位置正好是其在旧桶位置上加n,这里在HashMap(jdk1.8中)分析过。
     85                          * 这就是为什么扩容时,容量变成原来两倍的原因
     86                          */
     87                         Node<K,V> ln, hn; // ln:low节点 hn:height节点
     88                         // 链表的节点hash值大于0,TreeBin的hash值为-2
     89                         if (fh >= 0) {
     90                             // 首先计算出当前结点的位置
     91                             int runBit = fh & n;
     92                             Node<K,V> lastRun = f;
     93                             for (Node<K,V> p = f.next; p != null; p = p.next) {
     94                                 int b = p.hash & n;
     95                                 // 同一节点下hashCode可能是不同的,这样才会有hash分布
     96                                 // 更新runBit的值,找出与f不同的节点
     97                                 // 这里一直要找到链表尾,但是lastRun不一定是尾节点,也就是找到最后一段相同的
     98                                 // 因为是链表,当位置相同,直接就带过去了,避免没必要的循环
     99                                 if (b != runBit) {
    100                                     runBit = b;
    101                                     lastRun = p;
    102                                 }
    103                             }
    104                             // 设置低位节点
    105                             if (runBit == 0) {
    106                                 ln = lastRun;
    107                                 hn = null;
    108                             }
    109                             // 设置高位节点
    110                             else {
    111                                 hn = lastRun;
    112                                 ln = null;
    113                             }
    114                             // 生成两条链表,直接拼接
    115                             // 找到不等于lastRun的节点,进行拼接,不是倒序,这里就是进行一个拼接,因为把hash值相同的链从lastRun带过来了
    116                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
    117                                 int ph = p.hash; K pk = p.key; V pv = p.val;
    118                                 if ((ph & n) == 0)
    119                                     ln = new Node<K,V>(ph, pk, pv, ln);
    120                                 else
    121                                     hn = new Node<K,V>(ph, pk, pv, hn);
    122                             }
    123                             // 这里设置和hashMap类似,在相应点上设置节点即可
    124                             setTabAt(nextTab, i, ln);
    125                             setTabAt(nextTab, i + n, hn);
    126                             // 在旧的链表位置上设置占位符,标记已迁移完成
    127                             setTabAt(tab, i, fwd);
    128                             advance = true;
    129                         }
    130                         /**
    131                          * 结点是树的情况
    132                          * 和链表相同,分成两颗树,根据hash&n为0的放在低位树,为1的放在高位树
    133                          */
    134                         else if (f instanceof TreeBin) {
    135                             TreeBin<K,V> t = (TreeBin<K,V>)f;
    136                             TreeNode<K,V> lo = null, loTail = null;
    137                             TreeNode<K,V> hi = null, hiTail = null;
    138                             int lc = 0, hc = 0;
    139                             // 遍历整棵树,根据hash&n是否为0进行划分
    140                             for (Node<K,V> e = t.first; e != null; e = e.next) {
    141                                 int h = e.hash;
    142                                 TreeNode<K,V> p = new TreeNode<K,V>
    143                                     (h, e.key, e.val, null, null);
    144                                 if ((h & n) == 0) {
    145                                     if ((p.prev = loTail) == null)
    146                                         lo = p;
    147                                     else
    148                                         loTail.next = p;
    149                                     loTail = p;
    150                                     ++lc;
    151                                 }
    152                                 else {
    153                                     if ((p.prev = hiTail) == null)
    154                                         hi = p;
    155                                     else
    156                                         hiTail.next = p;
    157                                     hiTail = p;
    158                                     ++hc;
    159                                 }
    160                             }
    161                             // 复制完树结点之后,如果树的节点小于等于6时,就转回链表
    162                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    163                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
    164                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    165                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
    166                             // 低位树的位置不变
    167                             setTabAt(nextTab, i, ln);
    168                             // 高位树的位置在原来位置上加n
    169                             setTabAt(nextTab, i + n, hn);
    170                             // 标记该位置已经进迁移
    171                             setTabAt(tab, i, fwd);
    172                             // 继续循环,执行--i操作
    173                             advance = true;
    174                         }
    175                     }
    176                 }
    177             }
    178         }
    179     }

    分析:

    扩容函数中对于中间有段求i的值不是特别明白,其他流程还是比较清楚的,和HashMap的扩容有点类似,链表分成两段进行处理,通过hash&n是否等于0进行划分,迁移是从靠后的桶开始的(具体就在中间那段求i的值处),在迁移过程中锁住了当前桶,还是采用了分段锁的思想。需注意:#1.针对树节点,如果扩容后树节点上的元素总数小于等于6,则会退化成链表;#2.在链表拆分后进行组合时并不一定是倒序

    在put操作中还有一个帮助扩容的函数:helpTransfer

     1  // 线程添加元素时发现正在扩容且当前元素所在的桶已经迁移完成,则协助迁移其他桶的元素
     2     final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
     3         Node<K,V>[] nextTab; int sc;
     4         // 如果桶数组不为空,并且当前桶第一个元素为fwd类型,且nexttable不为空
     5         // 说明当前桶已经迁移完毕,可以去帮助迁移其他的桶的元素了
     6         if (tab != null && (f instanceof ForwardingNode) &&
     7             (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
     8             int rs = resizeStamp(tab.length);
     9            // sizeCtl<0,说明正在扩容
    10             while (nextTab == nextTable && table == tab &&
    11                    (sc = sizeCtl) < 0) {
    12                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    13                     sc == rs + MAX_RESIZERS || transferIndex <= 0)
    14                     break;
    15                 // 扩容线程数加1
    16                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
    17                     // 当前线程帮忙迁移元素
    18                     transfer(tab, nextTab);
    19                     break;
    20                 }
    21             }
    22             return nextTab;
    23         }
    24         return table;
    25     }

    分析:

    只有当前桶元素迁移完成了才能去协助迁移其他桶的元素。

    接下来看addCount函数,该函数在put操作后会判断是否需要扩容,如果达到扩容门槛,则进行扩容或协助扩容。

     1  private final void addCount(long x, int check) {
     2         CounterCell[] as; long b, s;
     3         // 如果计数盒子不为空,或者修改baseCount失败
     4         if ((as = counterCells) != null ||
     5             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     6             CounterCell a; long v; int m;
     7             boolean uncontended = true;
     8             // 如果as为空,或者长度为0,或者当前线程所在的段为null,或者在当前线程的段上加数量失败
     9             if (as == null || (m = as.length - 1) < 0 ||
    10                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    11                 !(uncontended =
    12                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    13                 // 这里对counterCells扩容,减少多线程hash到同一个段的频率
    14                 fullAddCount(x, uncontended);
    15                 return;
    16             }
    17             if (check <= 1)
    18                 return;
    19             // 计算元素个数
    20             s = sumCount();
    21         }
    22         if (check >= 0) {
    23             Node<K,V>[] tab, nt; int n, sc;
    24             // 如果元素个数达到了扩容门槛,则进行扩容
    25             // sizeCtl即为扩容门槛,它为容量的0.75倍
    26             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    27                    (n = tab.length) < MAXIMUM_CAPACITY) {
    28                 // rs是扩容的一个邮戳标识
    29                 int rs = resizeStamp(n);
    30                 // sc小于0,表明正在扩容
    31                 if (sc < 0) {
    32                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    33                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    34                         transferIndex <= 0)
    35                         // 扩容完成,退出循环
    36                         break;
    37                     // 扩容未完成,将当前线程加入迁移元素中,并把扩容线程数加1
    38                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    39                         transfer(tab, nt);
    40                 }
    41                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
    42                                              (rs << RESIZE_STAMP_SHIFT) + 2))
    43                     // 进行元素迁移
    44                     transfer(tab, null);
    45                 // 重新计算元素个数
    46                 s = sumCount();
    47             }
    48         }
    49     }

    分析:

    该函数的主要作用就是将元素个数加1,并且判断是否需要进行扩容。目前对该函数的详细逻辑不是特别清楚,后续再来进行分析。

    一个put操作涉及的内容太多了,还需深入理解,下面来看get操作:

     1 public V get(Object key) {
     2         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
     3         // 计算hash
     4         int h = spread(key.hashCode());
     5         // 如果对应位置上有元素
     6         if ((tab = table) != null && (n = tab.length) > 0 &&
     7             (e = tabAt(tab, (n - 1) & h)) != null) {
     8             // 如果第一个元素就是要找的元素,则直接返回
     9             if ((eh = e.hash) == h) {
    10                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    11                     return e.val;
    12             }
    13             // 如果hash小于0,则说明是树或正在扩容,则使用find寻找元素,find根据Node的不同子类实现方式不同
    14             else if (eh < 0)
    15                 return (p = e.find(h, key)) != null ? p.val : null;
    16             // 遍历整个链表寻找元素
    17             while ((e = e.next) != null) {
    18                 if (e.hash == h &&
    19                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
    20                     return e.val;
    21             }
    22         }
    23         return null;
    24     }

    分析:

    get操作整体来说逻辑清楚明了,与HashMap类似,但是要注意hash值小于0的时候,其寻找元素的方式有所不同,并且整个获取元素的过程是没有加锁的。

    接下来看remove操作:

     1 final V replaceNode(Object key, V value, Object cv) {
     2         // 计算hash值
     3         int hash = spread(key.hashCode());
     4         // 进行自旋操作
     5         for (Node<K,V>[] tab = table;;) {
     6             Node<K,V> f; int n, i, fh;
     7             // 如果tab为空,或者key所在的位置上没有元素,则直接终止自旋
     8             if (tab == null || (n = tab.length) == 0 ||
     9                 (f = tabAt(tab, i = (n - 1) & hash)) == null)
    10                 break;
    11             // 正在扩容,则协助其扩容
    12             else if ((fh = f.hash) == MOVED)
    13                 tab = helpTransfer(tab, f);
    14             else {
    15                 V oldVal = null;
    16                 // 标记是否处理过
    17                 boolean validated = false;
    18                 // 加锁
    19                 synchronized (f) {
    20                     // 再次验证当前位置上的元素是否被修改过
    21                     if (tabAt(tab, i) == f) {
    22                         // 链表
    23                         if (fh >= 0) {
    24                             validated = true;
    25                             // 遍历链表,寻找节点
    26                             for (Node<K,V> e = f, pred = null;;) {
    27                                 K ek;
    28                                 if (e.hash == hash &&
    29                                     ((ek = e.key) == key ||
    30                                      (ek != null && key.equals(ek)))) {
    31                                     // 找到目标元素
    32                                     V ev = e.val;
    33                                     if (cv == null || cv == ev ||
    34                                         (ev != null && cv.equals(ev))) {
    35                                         oldVal = ev;
    36                                         // 如果value不为空,则替换旧值
    37                                         if (value != null)
    38                                             e.val = value;
    39                                         else if (pred != null)
    40                                             // 前置节点不为空,删除当前节点
    41                                             pred.next = e.next;
    42                                         else
    43                                             // 如果前置节点为空,则说明是桶中第一个元素,则删除即可
    44                                             setTabAt(tab, i, e.next);
    45                                     }
    46                                     break;
    47                                 }
    48                                 // 更新前置节点
    49                                 pred = e;
    50                                 // 遍历到链表尾还未找打元素,则跳出循环
    51                                 if ((e = e.next) == null)
    52                                     break;
    53                             }
    54                         }
    55                         // 节点是树
    56                         else if (f instanceof TreeBin) {
    57                             validated = true;
    58                             TreeBin<K,V> t = (TreeBin<K,V>)f;
    59                             TreeNode<K,V> r, p;
    60                             // 遍历树找到目标节点
    61                             if ((r = t.root) != null &&
    62                                 (p = r.findTreeNode(hash, key, null)) != null) {
    63                                 V pv = p.val;
    64                                 if (cv == null || cv == pv ||
    65                                     (pv != null && cv.equals(pv))) {
    66                                     oldVal = pv;
    67                                     if (value != null)
    68                                         // 替换旧值
    69                                         p.val = value;
    70                                     else if (t.removeTreeNode(p))
    71                                         // 当removeTreeNode返回true表示树的元素个数较少,则退化成链表
    72                                         setTabAt(tab, i, untreeify(t.first));
    73                                 }
    74                             }
    75                         }
    76                     }
    77                 }
    78                 // 如果处理过
    79                 if (validated) {
    80                     // 找到了元素,返回其旧值
    81                     if (oldVal != null) {
    82                         // 如果要替换的值为空,则将元素个数减1
    83                         if (value == null)
    84                             addCount(-1L, -1);
    85                         return oldVal;
    86                     }
    87                     break;
    88                 }
    89             }
    90         }
    91         return null;
    92     }

    分析:

    利用自旋删除元素,整体流程清晰,根据链表或树进行相应操作,注意如果删除过程中正在进行扩容,需要协助其扩容后再进行删除。

    size函数:获取元素个数

     1 public int size() {
     2     // 调用sumCount计算元素个数
     3     long n = sumCount();
     4     return ((n < 0L) ? 0 :
     5             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
     6             (int)n);
     7 }
     8 
     9    final long sumCount() {
    10     // 计算CounterCell所有段以及baseCount的数量之和
    11     CounterCell[] as = counterCells; CounterCell a;
    12     long sum = baseCount;
    13     if (as != null) {
    14         for (int i = 0; i < as.length; ++i) {
    15             if ((a = as[i]) != null)
    16                 sum += a.value;
    17         }
    18     }
    19     return sum;
    20 }

    分析:

    元素的个数会计算CounterCell所有段和baseCount之和,并且该函数是没有加锁的。

    3.总结

    ConcurrentHashMap的源码分析真不容易,代码量非常的大,其实有的地方目前还没弄懂,需后续反复阅读。

    #1.ConcurrentHashMap是HashMap的线程安全版本。

    #2.ConcurrentHashMap底层数据结构为数组+链表+红黑树,默认容量为16,不允许[key,value]为null。

    #3.ConcurrentHashMap内部采用的锁有synchronized、CAS、自旋锁、分段锁、volatile。

    #4.通过sizeCtl变量来控制扩容、初始化等操作。

    #5.查询操作不加锁,因此ConcurrentHashMap不是强一致性

    ConcurrentHashMap未完待续!!!


    by Shawn Chen,2019.09.18日,下午。

  • 相关阅读:
    HBase with MapReduce (MultiTable Read)
    HBase with MapReduce (SummaryToFile)
    HBase with MapReduce (Summary)
    HBase with MapReduce (Read and Write)
    HBase with MapReduce (Only Read)
    Hbase中的BloomFilter(布隆过滤器)
    HBase的快照技术
    How To Use Hbase Bulk Loading
    Cloudera-Manager修改集群的IP
    Java中的HashSet和TreeSet
  • 原文地址:https://www.cnblogs.com/developer_chan/p/11527120.html
Copyright © 2011-2022 走看看