zoukankan      html  css  js  c++  java
  • ConcurrentHashMap原理笔记

    一.简介

    1.1 结构

    图片来源(https://blog.csdn.net/v123411739/article/details/78996181)

    1.2 成员变量

    /** 上图中table数组 */

    transient volatile Node<K,V>[] table;

    // 在扩容时会新建一个容量为 2 * n 的数组nextTable

    private transient volatile Node<K,V>[] nextTable;

    // HashMap总结点数由baseCount 和 counterCells 一起记录

    private transient volatile long baseCount;

    // // 在初始化table时,sizeCtl = -1

    // 初始化完成后或者扩容完成后,sizeCtl = 0.75 * n

    // 在扩容时 sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2

    private transient volatile int sizeCtl;

    // 用于协助扩容的参数

    private transient volatile int transferIndex;

    // 记录counterCells是否存在竞争

    private transient volatile int cellsBusy;

    // 协助记录HashMap节点数的数组

    private transient volatile CounterCell[] counterCells;

    二.方法

    2.1 put方法

    2.1.1 简要流程图

    2.1.2 put方法源码

    public V put(K key, V value) {

      return putVal(key, value, false);

    }

    /** Implementation for put and putIfAbsent */

    final V putVal(K key, V value, boolean onlyIfAbsent) {

    // key,value不能为空

    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode()); // 计算key的hash值

    int binCount = 0;

    for (Node<K,V>[] tab = table;;) {

      Node<K,V> f; int n, i, fh;

      if (tab == null || (n = tab.length) == 0)

        tab = initTable(); // table为空需要初始化

        // (n - 1) & hash 算出下标i

        // 如果table[i]节点为空,那么新建链表类型节点CAS赋值,赋值成功的直接退出循环           // 赋值失败的化进入下一轮循环,使用CAS能提高效率

      else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

        if (casTabAt(tab, i, null,

          new Node<K,V>(hash, key, value, null)))

          break; // no lock when adding to empty bin

      }

      // table不为空且在扩容,

      // 当table[i].hash = -1 时,表示当前有线程正在进行扩容操作,那么协助扩容

      else if ((fh = f.hash) == MOVED)

        tab = helpTransfer(tab, f);

      else { // table不为空且未扩容

        V oldVal = null;

        // 锁住table[i] 节点,相当于锁住链表或者红黑树

        synchronized (f) {

          if (tabAt(tab, i) == f) {

          //遍历链表进行覆盖或者插入

            if (fh >= 0) { //如果fh>=0 证明这是一个Node节点

              binCount = 1;

                       //在这里遍历链表所有的结点

              for (Node<K,V> e = f;; ++binCount) {

                K ek;

            //如果hash值和key值相同  则修改对应结点的value值

                if (e.hash == hash &&

                  ((ek = e.key) == key ||

                  (ek != null && key.equals(ek)))) {

                    oldVal = e.val;

                    if (!onlyIfAbsent)

                      e.val = value;

                    break;

                }

            // key或hash值不同则依次向后遍历,直到链表尾插入这个结点。

                Node<K,V> pred = e;

                if ((e = e.next) == null) {

                  pred.next = new Node<K,V>(hash, key,

                  value, null);

                    break;

                }

              }

            }

          // 如果table[i] 红黑树类型节点,那么调用红黑树putTreeVal方法插入

            else if (f instanceof TreeBin) {

              Node<K,V> p;

              binCount = 2;

              if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

                value)) != null) {

                oldVal = p.val;

                if (!onlyIfAbsent)

                  p.val = value;

              }

            }

          }

        }

      // 如果前面节点插入成功

        if (binCount != 0) {

        // 如果链表节点长度>=8 那么扩容或者转换成红黑树

          if (binCount >= TREEIFY_THRESHOLD)

            treeifyBin(tab, i);

          if (oldVal != null)

            return oldVal;

          break;

        }

      }

    }

    // 总结点数(元素数量)加一,并且需要检查扩容

    addCount(1L, binCount);

    return null;

    }

    可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment。

    2.1.3 initTable()初始化table

    private final Node<K,V>[] initTable() {

      Node<K,V>[] tab; int sc;

      while ((tab = table) == null || tab.length == 0) {

    // sizeCtl 小于0,说明当前正在初始化或者扩容 ,那么提示让出线程

    if ((sc = sizeCtl) < 0)

      Thread.yield(); // lost initialization race; just spin

         // 将 sizeCtl 设置成 -1

    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {

      try {

              // 新建Node类型数组 nt ,第一次创建默认容量为16,将nt 赋值给table变量

        if ((tab = table) == null || tab.length == 0) {

          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

          @SuppressWarnings("unchecked")

          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

          table = tab = nt;

          sc = n - (n >>> 2); // 移位运算后,sc = 0.75 * n,(n-1/4(n))

        }

      } finally {

        sizeCtl = sc; // 将sc 赋值给sizeCtl

      }

      break;

    }

    }

    return tab;

    }

    插入链表节点和红黑树节点和HashMap差异不大。

    2.1.4 treeifyBin

    这个方法用于将过长的链表转换为TreeBin对象。

    但是它并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin 。

    这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.

    private final void treeifyBin(Node<K,V>[] tab, int index) {

    Node<K,V> b; int n, sc;

    if (tab != null) {

    // 如果table.length 小于64 ,那么尝试扩容

    if ((n = tab.length) < MIN_TREEIFY_CAPACITY)

    tryPresize(n << 1);

        // 否则,转换成红黑树

    else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {

      synchronized (b) {

        if (tabAt(tab, index) == b) {

          TreeNode<K,V> hd = null, tl = null;

      // 遍历链表

          for (Node<K,V> e = b; e != null; e = e.next) {

        // 新建红黑树类型节点

            TreeNode<K,V> p =

              new TreeNode<K,V>(e.hash, e.key, e.val,

            null, null);

        // 红黑树内同时保留链表结构

          if ((p.prev = tl) == null)

            hd = p;

          else

            tl.next = p;

          tl = p;

          }

        // 设置table[index] 为当前红黑树头节点

          setTabAt(tab, index, new TreeBin<K,V>(hd));

        }

      }

    }

    }

    }

    2.1.5 size() 总结点数计算 (重要)

    ConcurrentHashMap中的总节点数通过 baseCount 字段和 counterCells 数组字段表示,如下:

    public int size() {

      long n = sumCount();

      return ((n < 0L) ? 0 :

        (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

        (int)n);

    }

    private transient volatile long baseCount;

    private transient volatile CounterCell[] counterCells;

    @sun.misc.Contended static final class CounterCell {

      volatile long value;

      CounterCell(long x) { value = x; }

    }

    final long sumCount() {

      CounterCell[] as = counterCells; CounterCell a;

      long sum = baseCount;

    if (as != null) {

    for (int i = 0; i < as.length; ++i) {

      if ((a = as[i]) != null)

        sum += a.value;

    }

    }

    return sum;

    }

    通过sumCount方法可以知道,ConcurrentHashMap总结点等于baseCount加上counterCells数组中所有有值CounterCell 对象中的value总和

    如下图所示,总结点数等于10 + 10 + 2。

    为什么要这么设计呢?

    想象一下,如果不使用counterCells数组,直接用baseCount方式来记录总结点数,那么每次put或者remove操作后,每条线程都要自旋修改baseCount,如果是并发冲突比较多的情况下,很多线程都处于自旋等待修改baseCount状态,效率会比较低,增加了counterCells数组记录后,线程通过CAS修改baseCount失败后不是自旋等待,而是立刻尝试随机选中counterCells中的一个下标的记录进行赋值或者修改,如果仍然修改失败才进入自旋去修改baseCount 或者 counterCells,效率有了质的提高。

    2.1.6 addCount() 增添总结点数 (重要)

    private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    // CAS对baseCount做加x操作,成功则不执行if代码块内容

    if ((as = counterCells) != null ||

    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

      CounterCell a; long v; int m;

      boolean uncontended = true; //默认无竞争

      // ThreadLocalRandom.getProbe() 生成线程随机数

      // ThreadLocalRandom.getProbe() & m 定位一个下标

      // 如果数组该下标处CounterCell对象不为空,通过CAS对该对象的value赋值为当前value + x

    if (as == null || (m = as.length - 1) < 0 ||

      (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

      !(uncontended =

      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

        // 如果if条件成立,那么进入fullAddCount方法自旋增加节点数

        fullAddCount(x, uncontended);

        return;

    }

    if (check <= 1) //check表示是否需要检查扩容

      return;

    s = sumCount();

    }

    //需要扩容,这部分逻辑在下面扩容小节中分析

    if (check >= 0) {

      ...

    }

    }

    增添总结点数主要逻辑:

    1.CAS 直接把baseCount 设置成baseCount + x,如果失败则操作counterCells数组

    2.随机定位counterCells的一个下标位置,取得该下标位置对应的CounterCell对象a

    3.CAS把a对象的value设置成a + x,如果失败进入fullAddCount方法自旋增加

    ​如果step1 2 3都失败,说明对baseCount 和 counterCells的并发访问冲突存在,那么就自旋修改节点数,fullAddCount 是通过自旋的方式修改总结点数,这个方法用到一个字段叫cellsBusy,cellsBusy是被volatile关键字修饰的,保证它的可见性,当它被设置成1时,只有当前设置cellsBusy为1的线程能对counterCells进行操作,相当于锁的作用。

    2.1.7 fullAddCount方法

    // See LongAdder version for explanation

    private final void fullAddCount(long x, boolean wasUncontended) {

    int h;

    // 如果分配随机数失败则重新初始化

    if ((h = ThreadLocalRandom.getProbe()) == 0) {

      ThreadLocalRandom.localInit(); // force initialization

      h = ThreadLocalRandom.getProbe();

      wasUncontended = true;

    }

        // collide 为true时会对counterCells数组进行扩容

    boolean collide = false; // True if last slot nonempty

    for (;;) {

      CounterCell[] as; CounterCell a; int n; long v;

      if ((as = counterCells) != null && (n = as.length) > 0) {

           // h随机定位counterCells的一个下标,获取该下标对应的CounterCell对象a

        // 如果对象a为空

        if ((a = as[(n - 1) & h]) == null) {

        // 如果cellsBusy等于0,表示当前counterCells数组没有被别的线程使用

          if (cellsBusy == 0) { // Try to attach new Cell

        // 新建CounterCell对象,value设置成x

          CounterCell r = new CounterCell(x); // Optimistic create

        // CAS 设置cellsBusy字段值为1

          if (cellsBusy == 0 &&

          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

            boolean created = false;

    try { // Recheck under lock

    // 重新校验获取该下标对应的CounterCell对象是否为空

    CounterCell[] rs; int m, j;

    if ((rs = counterCells) != null &&

      (m = rs.length) > 0 &&

      rs[j = (m - 1) & h] == null) {

    // 将新建的CounterCell对象r赋值给counterCells[j]

        rs[j] = r;

        created = true; // 创建成功

    }

    } finally {

      cellsBusy = 0; //退出时设置成0

    }

           if (created)

             break;

           continue; // Slot is now non-empty

          }

        }

        collide = false; //非冲突

      }

    // 当addCount方法调用此方法进来的时候,参数wasUncontended=false

    // 说明在addCount方法中CAS设置counterCells时失败

    // 应该直接把wasUncontended设置成true,然后直接走到最后

    // ThreadLocalRandom.advanceProbe(h)重新生成h值 ,进入下一轮循环

    else if (!wasUncontended) // CAS already known to fail

      wasUncontended = true; // Continue after rehash

            // 如果a对象即 as[(n - 1) & h]) 不为空,那么CAS赋值a的value为v+x

    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))

      break;

    // counterCells != as 说明有其他线程扩容了counterCells

    // n >= NCPU (cpu 核数)

    // 满足上述两个条件那把collide设置成false

    else if (counterCells != as || n >= NCPU)

      collide = false; // At max size or stale

    else if (!collide)

      collide = true; // 如果collide为false,那么把collide为true

    // 如果 collide 为true,说明并发访问比较大,对counterCells 数组进行扩容

    // 先CAS把cellsBusy 设置成1

    else if (cellsBusy == 0 &&

          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

    try {

    // 以双倍容量新建CounterCell类型数组rs,迁移旧数组数据到新数组

    // 最后把这个新数组赋值给counterCells字段

    if (counterCells == as) {// Expand table unless stale

    CounterCell[] rs = new CounterCell[n << 1];

    for (int i = 0; i < n; ++i)

      rs[i] = as[i];

    counterCells = rs;

    }

    } finally {

      cellsBusy = 0;

    }

        collide = false;

        continue; // Retry with expanded table

      }

      h = ThreadLocalRandom.advanceProbe(h); // 重新生成h值,进入下个循环

    }

    // 如果counterCells为空,那么初始化counterCells数组

    else if (cellsBusy == 0 && counterCells == as &&

      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

    boolean init = false;

    try { // Initialize table

    if (counterCells == as) {

      CounterCell[] rs = new CounterCell[2];

      rs[h & 1] = new CounterCell(x);

      counterCells = rs;

      init = true;

    }

    } finally {

      cellsBusy = 0;

    }

        if (init)

          break;

    }

    // 直接通过CAS修改baseCount的指为baseCount + x

    else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

      break; // Fall back on using base

    }

    }

    2.2 扩容

    private transient volatile int sizeCtl;

    sizeCtl是一个全局控制变量;

    • 初始值是0表示哈希表尚未初始化
    • 如果是-1表示正在初始化,只允许一个线程进入初始化代码块
    • 初始化或者reSize成功后,sizeCtl=loadFactor * tableSize也就是触发再次扩容的阈值,是一个正整数
    • 在扩容过程中,sizeCtrl是一个负整数,其高16位是与当前的tableSize关联的邮戳resizeStamp,其低16位是当前从事搬运工作的线程数加1

    addCount方法中后面有扩容的逻辑.

    private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    .....

    if (check >= 0) {

      Node<K,V>[] tab, nt; int n, sc;

    // 满足扩容条件,s是总结点数,sizeCtl是扩容阈值, s > sizeCtl 时触发扩容

    // table 为空,或者table长度已经达到最大时不触发扩容

    // while 循环保证同一条线程可以多次参与扩容

      while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&

        (n = tab.length) < MAXIMUM_CAPACITY) {

          int rs = resizeStamp(n);

      if (sc < 0) {

    // 扩容结束的判断

        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

          sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

          transferIndex <= 0)

            break;

    // 非第一次扩容操作, 每次把sizeCtl 设置成 sc + 1

        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))

          transfer(tab, nt);

      }

        // 第一次扩容操作,sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2

          else if (U.compareAndSwapInt(this, SIZECTL, sc,

            (rs << RESIZE_STAMP_SHIFT) + 2))

              transfer(tab, null);

          s = sumCount();

      }

    }

    }

    2.2.1 resizeStamp 计算操作

    static final int resizeStamp(int n) {

    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));

    }

    ​ n表示table.length; Integer.numberOfLeadingZeros(n) 方法是计算出n的二进制高位中有多少个连续的0,比如3的二进制中高位有30个0 。

    RESIZE_STAMP_BITS 在类中赋值为16 , 那么1 << (RESIZE_STAMP_BITS - 1) 后得到的二进制为 00000000000000001000000000000000,那么resizeStamp计算得出的二进制第16位为1,除了低五位和第16位可能为1外,其余位都为0,而且n的值不同计算出的值也不同。

    ​ sizeCtl 的值在初始化过程时为-1, 初始化成功后为 0.75 * n,在一开始进入到扩容步骤时,sizeCtl 为 0.75 * n,此时sc = sizeCtl > 0, 在第一条线程进行扩容操作时,会把sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2,由于rs第16位为1,除了低五位可能为1外,其余位都为0,那么rs << RESIZE_STAMP_SHIFT) + 2运算后,最高位为1,那么sizeCtl 绝对是一个负数

    ​ 所以当sc<0时,表示正在扩容,当sc >0 时,表示第一条线程进行扩容操作,把sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2,非第一条线程进行扩容操作的话每次把sizeCtl 设置成 sc + 1

    2.2.2 扩容结束的判断

    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

    transferIndex<= 0)

    break;

    ​当正在进行扩容扩容时sc >>> RESIZE_STAMP_SHIFT可以还原出resizeStamp(n)的值;

    如果扩容已经结束了,那么sizeCtl会被设置成 新的 n * 0.75,此时sc >>> RESIZE_STAMP_SHIFT 跟resizeStamp(int n)计算出的值不会相等;

    ​map中是通过transferIndex这个字段来控制线程负责的桶数的,如果是第一条线程进行扩容操作,那么transferIndex = n,并且当前线程会把transferIndex 设置成( transferIndex - stride), stride 是代码计算出来每个线程应该负责的桶数,当前线程就负责下标为 ( transferIndex - stride)transferIndex - 1的桶, 如果不是第一次扩容,那么不用赋值n,直接用transferIndex 计算当前线程应该负责的桶区间;

    ​假设 n = 64 ,stride 为16

    线程1最先进行扩容操作,此时 transferIndex = 64,线程1负责扩容table下标48 -> 63的16个桶,并且设置transferIndex 为 48 ;

    线程2为第二条进行扩容操作的线程,此时transferIndex = 48,当前负责的线程负责扩容table下标32 - >47的16个桶,并且设置transferIndex 为 32 ;

    … 以此类推

    2.2.3 transfer扩容操作

    /** 一个过渡的table表  只有在扩容的时候才会使用 */

    private transient volatile Node<K,V>[] nextTable;

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

    int n = tab.length, stride;

    // 计算每个线程负责的桶数,这里根据CPU核数计算

    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)

      stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果nextTab为空,表示第一条进入的线程进行扩容操作

    // 新建容量为 2*n 的Node数组nextTab

    if (nextTab == null) { // initiating

    try {

      @SuppressWarnings("unchecked")

      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];

      nextTab = nt; ////构造一个nextTable对象,容量是原来的两倍,赋值给nextTab

    } catch (Throwable ex) { // try to cope with OOME

      sizeCtl = Integer.MAX_VALUE;

      return;

    }

      nextTable = nextTab;

      transferIndex = n; // transferIndex 赋值为n

    }

    int nextn = nextTab.length; // 新数组的长度(2*n)

    // //构造一个连节点指针 用于标志位 ,ForwardingNode 类对象的hash值为-1

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    boolean advance = true; //并发扩容的关键属性 如果true 说明这个节点已经处理过

    // 表示扩容是否结束

    boolean finishing = false; // to ensure sweep before committing nextTab

    for (int i = 0, bound = 0;;) {

      Node<K,V> f; int fh;

      //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点

      while (advance) { // 每次for 循环开始advance=true

      int nextIndex, nextBound;

    // 如果当前线程正在处理,或者扩容已经结束

      if (--i >= bound || finishing)

        advance = false;

    // transferIndex <0 表示扩容结束

      else if ((nextIndex = transferIndex) <= 0) {

        i = -1;

        advance = false;

      }

    // transferIndex 设置成nextIndex(即transferIndex) - stride

      else if (U.compareAndSwapInt

        (this, TRANSFERINDEX, nextIndex,

        nextBound = (nextIndex > stride ?

        nextIndex - stride : 0))) {

      // 当前线程负责的就是下标 bound 到 nextIndex - 1 的桶

          bound = nextBound;

          i = nextIndex - 1;

          advance = false;

      }

      }

    // 如果扩容结束

      if (i < 0 || i >= n || i + n >= nextn) {

        int sc;

      // 如果finishing为true,表示所有桶处理完成,扩容结束,就把nextTable赋值给table 清空临时对象nextTable,并且sizeCtl = 新 n * 0.75

        if (finishing) {

          nextTable = null;

          table = nextTab;

      //扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍

          sizeCtl = (n << 1) - (n >>> 1);

          return;

        }

      //CAS方法更新这个扩容阈值,sizectl-1,说明新加入一个线程参与到扩容操作

        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

      // 那么表示扩容还未结束,并非最后一个退出的线程

          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

            return;

          finishing = advance = true; // 设置扩容结束标志

        //异常巧妙的设计,最后一个线程退出前将i回退到最高位,等于是强制做最后一次的全表扫描;程序直接执行后续的else if代码,看有没有哪个槽位漏掉了,或者说是否全部是forwardingNode标记;

        //可以视为抄底逻辑,虽然检测到漏掉槽位的概率基本是0

          i = n; // recheck before commit

        }

      }

    // 如果当前桶为空,那么把 table[i] 放入ForwardingNode指针

    // 那么下次线程调用put 或者 remove方法时如果发现table[i] 的 hash < 0 就协助扩容

    else if ((f = tabAt(tab, i)) == null)

    advance = casTabAt(tab, i, null, fwd);

    // 如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心

    else if ((fh = f.hash) == MOVED) // 当前桶已经被扩容过

      advance = true; // already processed

    else { // 锁住当前桶(链表或者红黑树,进行操作)

    synchronized (f) {

    if (tabAt(tab, i) == f) {

    Node<K,V> ln, hn;

    // fh>=0 证明这是一个Node节点

    if (fh >= 0) {

    //此处省略链表搬运代码: 将链表拆成两份,搬运到nextTable的i和i+n槽位

    setTabAt(nextTab, i, ln); //在nextTable的i位置上插入一个链表

    setTabAt(nextTab, i + n, hn); //在nextTable的i+n的位置上插入另一个链表

    // table[i] 设置成fwd, 那么下次线程调用put 或者 remove方法时如果发现table[i] 的hash <0 就协助扩容

    setTabAt(tab, i, fwd);

    //设置advance为true 返回到上面的while循环中 就可以执行i--操作

    advance = true;

    }

    else if (f instanceof TreeBin) { // 红黑树

    // 此处省略红黑树搬运代码: 将红黑树拆成两份,搬运到nextTable的i和i+n槽位,如果满足红黑树的退化条件,顺便将其退化为链表

    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :

    (hc != 0) ? new TreeBin<K,V>(lo) : t;

    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :

    (lc != 0) ? new TreeBin<K,V>(hi) : t;

    setTabAt(nextTab, i, ln);

    setTabAt(nextTab, i + n, hn);

    //设置旧表对应槽位的头节点为forwardingNode , 那么下次线程调用put 或者 remove方法时如果发现table[i] 的hash <0 就协助扩容

    setTabAt(tab, i, fwd);

    advance = true;

    }

    }

    }

    }

    }

    }

    多个线程并发搬运时,如果是首个搬运线程,负责nextTable的初始化工作;

    然后借助于全局的transferIndex变量从当前table的n-1槽位开始依次向低位扫描搬运,通过对transferIndex的CAS操作一次获取一个区段(默认是16),当transferIndex达到最低位时,不再能够获取到新的区段,线程开始退出,退出时会在sizeCtl上将总的线程数减一,最后一个退出的线程将扫描坐标i回退到最高位,强迫做一次抄底的全局扫描。

    其中对链表和红黑树的扩容拆分操作和HashMap类似。

    2.3 put方法小结 (重点)

    Node + CAS + Synchronized;

    put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。

    ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是不允许key或value为null值。另外由于涉及到多线程,put方法 在多线程中可能有以下两个情况

    1.如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;

    2.如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

    整体流程就是:

    首先定义不允许key或value为null的情况放入 ;

    对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。

    如果这个位置是空的,那么直接放入,而且不需要加锁操作。

    如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。

    如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。

    如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

    2.4 get方法

    2.4.1 get操作为什么不需要加锁?

    首先需要明确的是,ConcurrentHashMap的读操作一般是不加锁的(TreeBin的读写锁除外),而读操作与写操作有可能并行;可以保证的是,因为ConcurrentHashMap的写操作都要获取bin头部的syncronized互斥锁,能保证最多只有一个线程在做更新,这其实是一个单线程写、多线程读的并发安全性的问题。

    2.4.2 volatile

    volatile关键字作用:

    Java提供了volatile关键字来保证可见性、有序性。但不保证原子性。

    普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。

    • volatile关键字对于基本类型的修改可以在随后对多个线程的读保持一致,但是对于引用类型如数组,实体bean,仅仅保证引用的可见性,但并不保证引用内容的可见性。。
    • 禁止进行指令重排序。

    背景:

    为了提高处理速度,CPU不直接和内存进行通信,而是先将系统内存的数据读到CPU内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。

    • 如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。
    • 在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,当某个CPU在写数据时,如果发现操作的变量是共享变量,则会通知其他CPU告知该变量的缓存行是无效的,因此其他CPU在读取该变量时,发现其无效会重新从主存中加载数据。

    总结下来:

    第一:使用volatile关键字会强制将修改的值立即写入主存;

    第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);

    第三:由于线程1的工作内存中缓存变量的缓存行无效,所以线程1再次读取变量的值时会去主存读取。

    2.4.3 volatile修饰Node

    get操作可以无锁是由于Node的元素val和指针next是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的; 和Node[] table 数组用volatile修饰没有关系。

    另外,Node[] table 数组用volatile修饰主要是保证在数组扩容的时候保证可见性。

    static class Node<K,V> implements Map.Entry<K,V> {

    final int hash;

    final K key;

    volatile V val;

    volatile Node<K,V> next;

    ...

    }

    2.4.4 get源码

    public V get(Object key) {

    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

    int h = spread(key.hashCode()); //计算hash值

    //根据hash值确定节点位置

    if ((tab = table) != null && (n = tab.length) > 0 &&

    (e = tabAt(tab, (n - 1) & h)) != null) {

    //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点

    if ((eh = e.hash) == h) {

      if ((ek = e.key) == key || (ek != null && key.equals(ek)))

      return e.val;

    }

    //e节点hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来

    //eh= -1,说明该节点是一个ForwardingNode,正在迁移,此时调用ForwardingNode的find方法去nextTable里找。

    //eh= -2,说明该节点是一个TreeBin,此时调用TreeBin的find方法遍历红黑树,由于红黑树有可能正在旋转变色,所以find里会有读写锁

    //eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。

      else if (eh < 0)

        return (p = e.find(h, key)) != null ? p.val : null;

      //否则遍历链表 找到对应的值并返回

      while ((e = e.next) != null) {

        if (e.hash == h &&

        ((ek = e.key) == key || (ek != null && key.equals(ek))))

          return e.val;

      }

    }

    return null;

    }

    1. 如果读取的bin是一个链表

    说明头节点是个普通Node,

    (1)如果正在发生链表向红黑树的treeify工作,因为treeify本身并不破坏旧的链表bin的结构,只是在全部treeify完成后将头节点一次性替换为新创建的TreeBin,可以放心读取。

    (2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的链表bin的结构,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。

    (3)如果其它线程正在操作链表,在当前线程遍历链表的任意一个时间点,都有可能同时在发生add/replace/remove操作。

    • 如果是add操作,因为链表的节点新增从JDK8以后都采用了后入式,无非是多遍历或者少遍历一个tailNode。
    • 如果是remove操作,存在遍历到某个Node时,正好有其它线程将其remove,导致其孤立于整个链表之外;但因为其next引用未发生变更,整个链表并没有断开,还是可以照常遍历链表直到tailNode。
    • 如果是replace操作,链表的结构未变,只是某个Node的value发生了变化,没有安全问题。

    结论:

    对于链表这种线性数据结构,单线程写且插入操作保证是后入式的前提下,并发读取是安全的;不会存在误读、链表断开导致的漏读、读到环状链表等问题。

    2.如果读取的bin是一个红黑树

    说明头节点是个TreeBin节点。

    (1)如果正在发生红黑树向链表的untreeify操作,因为untreeify本身并不破坏旧的红黑树结构,只是在全部untreeify完成后将头节点一次性替换为新创建的普通Node,可以放心读取。

    (2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的红黑树结构,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。

    (3)如果其他线程在操作红黑树,在当前线程遍历红黑树的任意一个时间点,都可能有单个的其它线程发生add/replace/remove/红黑树的翻转等操作,参考下面的红黑树的读写锁实现。

    3.TreeBin中的读写锁实现 (重要)

    static final class TreeBin<K,V> extends Node<K,V> {

    TreeNode<K,V> root;

    volatile TreeNode<K,V> first;

    volatile Thread waiter;

    volatile int lockState;

    // values for lockState

    static final int WRITER = 1; // set while holding write lock

    static final int WAITER = 2; // set when waiting for write lock

    static final int READER = 4; // increment value for setting read lock

    ......

    private final void lockRoot() {

        //如果一次性获取写锁失败,进入contendedLock循环体,循环获取写锁或者休眠等待

    if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))

    contendedLock(); // offload to separate method

    }

    private final void unlockRoot() {

    lockState = 0;

    }

    /** 红黑树加互斥锁,也就是写锁

    * Possibly blocks awaiting root lock. */

    private final void contendedLock() {

    boolean waiting = false;

    for (int s;;) {

    //1.如果lockState除了第二位外其它位上都为0,表示红黑树当前既没有上读锁,又没有上写锁,仅有可能存在waiter,可以尝试直接获取写锁

    if (((s = lockState) & ~WAITER) == 0) {

    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {

    if (waiting)

    waiter = null;

    return;

    }

    }

    //2.如果lockState第二位是0,表示当前没有线程在等待写锁

    else if ((s & WAITER) == 0) {

    //将lockState的第二位设置为1,相当于打上了waiter的标记,表示有线程在等待写锁

    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {

    waiting = true;

    waiter = Thread.currentThread();

    }

    }

    //3.休眠当前线程

    else if (waiting)

    LockSupport.park(this);

    }

    }

    //查找红黑树中的某个节点

    final Node<K,V>find(int h, Object k) {

    if (k != null) {

    for (Node<K,V>e = first; e != null; ) {

      int s; K ek;

    //如果当前有waiter有写锁,走线性检索,因为红黑树虽然替代了链表,但其内部依然保留了链表的结构,虽然链表的查询性能一般,但根据先前的分析其读取的安全性有保证

    //发现有写锁改走线性检索,是为了避免等待写锁释放花去太久时间; 而发现有waiter改走线性检索,是为了避免读锁叠加的太多,导致写锁线程需要等待太长的时间; 本质上都是为了减少读写碰撞

    //线性遍历的过程中,每遍历到下一个节点都做一次判断,一旦发现锁竞争的可能性减少就改走tree检索以提高性能

    if (((s = lockState) & (WAITER|WRITER)) != 0) {

    if (e.hash == h &&

    ((ek = e.key) == k || (ek != null && k.equals(ek))))

        return e;

      e = e.next;

    }

    //对红黑树加共享锁,也就是读锁,CAS一次性增加4,也就是增加的只是3~32位

    else if (U.compareAndSwapInt(this, LOCKSTATE, s,

    s + READER)) {

      TreeNode<K,V> r, p;

    try {

      p = ((r = root) == null ? null :

      r.findTreeNode(h, k, null));

    } finally {

      Thread w;

    //释放读锁,如果释放完毕且有waiter,则将其唤醒

      if (U.getAndAddInt(this, LOCKSTATE, -READER) ==

        (READER|WAITER) && (w = waiter) != null)

          LockSupport.unpark(w);

    }

      return p;

    }

    }

    }

      return null;

    }

    /** 更新红黑树中的某个节点 */

    final TreeNode<K,V> putTreeVal(int h, K k, V v) {

    Class<?> kc = null;

    boolean searched = false;

    for (TreeNode<K,V> p = root;;) {

      int dir, ph; K pk;

          //...省略处理红黑树数据结构的代码若干

    else {

      lockRoot(); //写操作前加互斥锁

    try {

      root = balanceInsertion(root, x);

    } finally {

      unlockRoot(); //释放互斥锁

    }

    }

    break;

    }

    }

    assert checkInvariants(root);

    return null;

    }

    }

    红黑树内置了一套读写锁的逻辑,其内部定义了32位的int型变量lockState,第1位是写锁标志位,第2位是写锁等待标志位,从3~32位则是共享锁标志位。

    图片来源: https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug

    读写操作是互斥的,允许多个线程同时读取,但不允许读写操作并行,同一时刻只允许一个线程进行写操作;这样任意时间点读取的都是一个合法的红黑树,整体上是安全的。

    TreeBin在find读操作检索时,在linearSearch(线性检索)和treeSearch(树检索)间做了折衷,前者性能差但并发安全,后者性能佳但要做并发控制,可能导致锁竞争;

    设计者使用线性检索来尽量避免读写碰撞导致的锁竞争,但评估到race condition已消失时,又立即趋向于改用树检索来提高性能,在安全和性能之间做到了极佳的平衡。具体的折衷策略请参考find方法及注释。

    由于有线性检索这样一个抄底方案,以及入口处bin头节点的synchornized机制,保证了进入到TreeBin整体代码块的写线程只有一个;TreeBin中读写锁的整体设计与ReentrantReadWriteLock相比还是简单了不少,比如并未定义用于存放待唤醒线程的threadQueue,以及读线程仅会自旋而不会阻塞等等, 可以看做是特定条件下ReadWriteLock的简化版本。

    4.如果读取的bin是一个ForwardingNode

    说明当前bin已迁移,调用其find方法到nextTable读取数据。

    static final class ForwardingNode<K,V> extends Node<K,V> {

    final Node<K,V>[] nextTable;

    ForwardingNode(Node<K,V>[] tab) {

      super(MOVED, null, null, null);

      this.nextTable = tab;

      }

    //递归检索哈希表链

    Node<K,V> find(int h, Object k) {

    // loop to avoid arbitrarily deep recursion on forwarding nodes

    outer: for (Node<K,V>[] tab = nextTable;;) {

    Node<K,V> e; int n;

    if (k == null || tab == null || (n = tab.length) == 0 ||

    (e = tabAt(tab, (n - 1) & h)) == null)

      return null;

    for (;;) {

      int eh; K ek;

      if ((eh = e.hash) == h &&

        ((ek = e.key) == k || (ek != null && k.equals(ek))))

        return e;

      if (eh < 0) {

    if (e instanceof ForwardingNode) {

      tab = ((ForwardingNode<K,V>)e).nextTable;

      continue outer;

    }

      else

        return e.find(h, k);

      }

    if ((e = e.next) == null)

      return null;

    }

    }

      }

    }

    ForwardingNode中保存了nextTable的引用,会转向下一个哈希表进行检索,但并不能保证nextTable就一定是currentTable,因为在高并发插入的情况下,极短时间内就可以导致哈希表的多次扩容,内存中极有可能驻留一条哈希表链,彼此以bin的头节点上的ForwardingNode相连,线程刚读取时拿到的是table1,遍历时却有可能经历了哈希表的链条。

    eh<0有三种情况:

    如果是ForwardingNode继续遍历下一个哈希表。

    如果是TreeBin,调用其find方法进入TreeBin读写锁的保护区读取数据。

    如果是ReserveNode,说明当前有compute计算中,整条bin还是一个空结构,直接返回null。

    5.如果读取的bin是一个ReserveNode

    ReserveNode用于compute/computeIfAbsent原子计算的方法,在BIN的头节点为null且计算尚未完成时,先在bin的头节点打上一个ReserveNode的占位标记

    读操作发现ReserveNode直接返回null,

    写操作会因为争夺ReserveNode的互斥锁而进入阻塞态,在compute完成后被唤醒后循环重试。

    2.5 delete方法

    final V replaceNode(Object key, V value, Object cv) {

    int hash = spread(key.hashCode());

    for (Node<K,V>[] tab = table;;) {

      Node<K,V> f; int n, i, fh;

      // 如果table 为空,或者table[i] 为空,那么直接返回

      if (tab == null || (n = tab.length) == 0 ||

        (f = tabAt(tab, i = (n - 1) & hash)) == null)

        break;

      // 如果table[i] 的hash 为 -1,那么协助扩容

      else if ((fh = f.hash) == MOVED)

        tab = helpTransfer(tab, f);

      else {

    V oldVal = null;

    boolean validated = false;

    synchronized (f) {

    // 链表删除

    if (tabAt(tab, i) == f) {

      if (fh >= 0) {

      validated = true;

      for (Node<K,V> e = f, pred = null;;) {

      K ek;

      if (e.hash == hash &&

        ((ek = e.key) == key ||

        (ek != null && key.equals(ek)))) {

      V ev = e.val;

      if (cv == null || cv == ev ||

        (ev != null && cv.equals(ev))) {

          oldVal = ev;

        if (value != null)

          e.val = value;

        else if (pred != null)

          pred.next = e.next;

        else

          setTabAt(tab, i, e.next);

      }

        break;

      }

      pred = e;

      if ((e = e.next) == null)

        break;

      }

      }

    // 红黑树删除

      else if (f instanceof TreeBin) {

    validated = true;

    TreeBin<K,V> t = (TreeBin<K,V>)f;

    TreeNode<K,V> r, p;

      if ((r = t.root) != null &&

        (p = r.findTreeNode(hash, key, null)) != null) {

      V pv = p.val;

      if (cv == null || cv == pv ||

        (pv != null && cv.equals(pv))) {

          oldVal = pv;

        if (value != null)

          p.val = value;

        else if (t.removeTreeNode(p))

          setTabAt(tab, i, untreeify(t.first));

    }

    }

    }

    }

    }

    // 总结点数减一,不检查扩容

    if (validated) {

    if (oldVal != null) {

      if (value == null)

      addCount(-1L, -1);

      return oldVal;

    }

      break;

    }

    }

    }

    return null;

    }

    三.问题

    参考:

    https://blog.csdn.net/cx897459376/article/details/106427587

    https://blog.csdn.net/caoxiaohong1005/article/details/78083655

    https://www.cnblogs.com/keeya/p/9632958.html

    https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug

  • 相关阅读:
    skywalking学习
    logstash使用
    AIO编程
    NIO编程
    NIO入门之BIO
    Akka Cluster之集群分片
    Akka Stream之Graph
    【Swift学习笔记-《PRODUCT》读书记录-实现自定义转场动画】
    CoreData 数据模型文件导出NSManagedObject时重复问题
    iOS图片压缩
  • 原文地址:https://www.cnblogs.com/coloz/p/14447184.html
Copyright © 2011-2022 走看看