zoukankan      html  css  js  c++  java
  • ConcurrentHashMap原理笔记

    一.简介

    1.1 结构

    图片来源(https://blog.csdn.net/v123411739/article/details/78996181)

    1.2 成员变量

    /** 上图中table数组 */

    transient volatile Node<K,V>[] table;

    // 在扩容时会新建一个容量为 2 * n 的数组nextTable

    private transient volatile Node<K,V>[] nextTable;

    // HashMap总结点数由baseCount 和 counterCells 一起记录

    private transient volatile long baseCount;

    // // 在初始化table时,sizeCtl = -1

    // 初始化完成后或者扩容完成后,sizeCtl = 0.75 * n

    // 在扩容时 sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2

    private transient volatile int sizeCtl;

    // 用于协助扩容的参数

    private transient volatile int transferIndex;

    // 记录counterCells是否存在竞争

    private transient volatile int cellsBusy;

    // 协助记录HashMap节点数的数组

    private transient volatile CounterCell[] counterCells;

    二.方法

    2.1 put方法

    2.1.1 简要流程图

    2.1.2 put方法源码

    public V put(K key, V value) {

      return putVal(key, value, false);

    }

    /** Implementation for put and putIfAbsent */

    final V putVal(K key, V value, boolean onlyIfAbsent) {

    // key,value不能为空

    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode()); // 计算key的hash值

    int binCount = 0;

    for (Node<K,V>[] tab = table;;) {

      Node<K,V> f; int n, i, fh;

      if (tab == null || (n = tab.length) == 0)

        tab = initTable(); // table为空需要初始化

        // (n - 1) & hash 算出下标i

        // 如果table[i]节点为空,那么新建链表类型节点CAS赋值,赋值成功的直接退出循环           // 赋值失败的化进入下一轮循环,使用CAS能提高效率

      else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

        if (casTabAt(tab, i, null,

          new Node<K,V>(hash, key, value, null)))

          break; // no lock when adding to empty bin

      }

      // table不为空且在扩容,

      // 当table[i].hash = -1 时,表示当前有线程正在进行扩容操作,那么协助扩容

      else if ((fh = f.hash) == MOVED)

        tab = helpTransfer(tab, f);

      else { // table不为空且未扩容

        V oldVal = null;

        // 锁住table[i] 节点,相当于锁住链表或者红黑树

        synchronized (f) {

          if (tabAt(tab, i) == f) {

          //遍历链表进行覆盖或者插入

            if (fh >= 0) { //如果fh>=0 证明这是一个Node节点

              binCount = 1;

                       //在这里遍历链表所有的结点

              for (Node<K,V> e = f;; ++binCount) {

                K ek;

            //如果hash值和key值相同  则修改对应结点的value值

                if (e.hash == hash &&

                  ((ek = e.key) == key ||

                  (ek != null && key.equals(ek)))) {

                    oldVal = e.val;

                    if (!onlyIfAbsent)

                      e.val = value;

                    break;

                }

            // key或hash值不同则依次向后遍历,直到链表尾插入这个结点。

                Node<K,V> pred = e;

                if ((e = e.next) == null) {

                  pred.next = new Node<K,V>(hash, key,

                  value, null);

                    break;

                }

              }

            }

          // 如果table[i] 红黑树类型节点,那么调用红黑树putTreeVal方法插入

            else if (f instanceof TreeBin) {

              Node<K,V> p;

              binCount = 2;

              if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

                value)) != null) {

                oldVal = p.val;

                if (!onlyIfAbsent)

                  p.val = value;

              }

            }

          }

        }

      // 如果前面节点插入成功

        if (binCount != 0) {

        // 如果链表节点长度>=8 那么扩容或者转换成红黑树

          if (binCount >= TREEIFY_THRESHOLD)

            treeifyBin(tab, i);

          if (oldVal != null)

            return oldVal;

          break;

        }

      }

    }

    // 总结点数(元素数量)加一,并且需要检查扩容

    addCount(1L, binCount);

    return null;

    }

    可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment。

    2.1.3 initTable()初始化table

    private final Node<K,V>[] initTable() {

      Node<K,V>[] tab; int sc;

      while ((tab = table) == null || tab.length == 0) {

    // sizeCtl 小于0,说明当前正在初始化或者扩容 ,那么提示让出线程

    if ((sc = sizeCtl) < 0)

      Thread.yield(); // lost initialization race; just spin

         // 将 sizeCtl 设置成 -1

    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {

      try {

              // 新建Node类型数组 nt ,第一次创建默认容量为16,将nt 赋值给table变量

        if ((tab = table) == null || tab.length == 0) {

          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

          @SuppressWarnings("unchecked")

          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

          table = tab = nt;

          sc = n - (n >>> 2); // 移位运算后,sc = 0.75 * n,(n-1/4(n))

        }

      } finally {

        sizeCtl = sc; // 将sc 赋值给sizeCtl

      }

      break;

    }

    }

    return tab;

    }

    插入链表节点和红黑树节点和HashMap差异不大。

    2.1.4 treeifyBin

    这个方法用于将过长的链表转换为TreeBin对象。

    但是它并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin 。

    这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.

    private final void treeifyBin(Node<K,V>[] tab, int index) {

    Node<K,V> b; int n, sc;

    if (tab != null) {

    // 如果table.length 小于64 ,那么尝试扩容

    if ((n = tab.length) < MIN_TREEIFY_CAPACITY)

    tryPresize(n << 1);

        // 否则,转换成红黑树

    else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {

      synchronized (b) {

        if (tabAt(tab, index) == b) {

          TreeNode<K,V> hd = null, tl = null;

      // 遍历链表

          for (Node<K,V> e = b; e != null; e = e.next) {

        // 新建红黑树类型节点

            TreeNode<K,V> p =

              new TreeNode<K,V>(e.hash, e.key, e.val,

            null, null);

        // 红黑树内同时保留链表结构

          if ((p.prev = tl) == null)

            hd = p;

          else

            tl.next = p;

          tl = p;

          }

        // 设置table[index] 为当前红黑树头节点

          setTabAt(tab, index, new TreeBin<K,V>(hd));

        }

      }

    }

    }

    }

    2.1.5 size() 总结点数计算 (重要)

    ConcurrentHashMap中的总节点数通过 baseCount 字段和 counterCells 数组字段表示,如下:

    public int size() {

      long n = sumCount();

      return ((n < 0L) ? 0 :

        (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

        (int)n);

    }

    private transient volatile long baseCount;

    private transient volatile CounterCell[] counterCells;

    @sun.misc.Contended static final class CounterCell {

      volatile long value;

      CounterCell(long x) { value = x; }

    }

    final long sumCount() {

      CounterCell[] as = counterCells; CounterCell a;

      long sum = baseCount;

    if (as != null) {

    for (int i = 0; i < as.length; ++i) {

      if ((a = as[i]) != null)

        sum += a.value;

    }

    }

    return sum;

    }

    通过sumCount方法可以知道,ConcurrentHashMap总结点等于baseCount加上counterCells数组中所有有值CounterCell 对象中的value总和

    如下图所示,总结点数等于10 + 10 + 2。

    为什么要这么设计呢?

    想象一下,如果不使用counterCells数组,直接用baseCount方式来记录总结点数,那么每次put或者remove操作后,每条线程都要自旋修改baseCount,如果是并发冲突比较多的情况下,很多线程都处于自旋等待修改baseCount状态,效率会比较低,增加了counterCells数组记录后,线程通过CAS修改baseCount失败后不是自旋等待,而是立刻尝试随机选中counterCells中的一个下标的记录进行赋值或者修改,如果仍然修改失败才进入自旋去修改baseCount 或者 counterCells,效率有了质的提高。

    2.1.6 addCount() 增添总结点数 (重要)

    private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    // CAS对baseCount做加x操作,成功则不执行if代码块内容

    if ((as = counterCells) != null ||

    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

      CounterCell a; long v; int m;

      boolean uncontended = true; //默认无竞争

      // ThreadLocalRandom.getProbe() 生成线程随机数

      // ThreadLocalRandom.getProbe() & m 定位一个下标

      // 如果数组该下标处CounterCell对象不为空,通过CAS对该对象的value赋值为当前value + x

    if (as == null || (m = as.length - 1) < 0 ||

      (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

      !(uncontended =

      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

        // 如果if条件成立,那么进入fullAddCount方法自旋增加节点数

        fullAddCount(x, uncontended);

        return;

    }

    if (check <= 1) //check表示是否需要检查扩容

      return;

    s = sumCount();

    }

    //需要扩容,这部分逻辑在下面扩容小节中分析

    if (check >= 0) {

      ...

    }

    }

    增添总结点数主要逻辑:

    1.CAS 直接把baseCount 设置成baseCount + x,如果失败则操作counterCells数组

    2.随机定位counterCells的一个下标位置,取得该下标位置对应的CounterCell对象a

    3.CAS把a对象的value设置成a + x,如果失败进入fullAddCount方法自旋增加

    ​如果step1 2 3都失败,说明对baseCount 和 counterCells的并发访问冲突存在,那么就自旋修改节点数,fullAddCount 是通过自旋的方式修改总结点数,这个方法用到一个字段叫cellsBusy,cellsBusy是被volatile关键字修饰的,保证它的可见性,当它被设置成1时,只有当前设置cellsBusy为1的线程能对counterCells进行操作,相当于锁的作用。

    2.1.7 fullAddCount方法

    // See LongAdder version for explanation

    private final void fullAddCount(long x, boolean wasUncontended) {

    int h;

    // 如果分配随机数失败则重新初始化

    if ((h = ThreadLocalRandom.getProbe()) == 0) {

      ThreadLocalRandom.localInit(); // force initialization

      h = ThreadLocalRandom.getProbe();

      wasUncontended = true;

    }

        // collide 为true时会对counterCells数组进行扩容

    boolean collide = false; // True if last slot nonempty

    for (;;) {

      CounterCell[] as; CounterCell a; int n; long v;

      if ((as = counterCells) != null && (n = as.length) > 0) {

           // h随机定位counterCells的一个下标,获取该下标对应的CounterCell对象a

        // 如果对象a为空

        if ((a = as[(n - 1) & h]) == null) {

        // 如果cellsBusy等于0,表示当前counterCells数组没有被别的线程使用

          if (cellsBusy == 0) { // Try to attach new Cell

        // 新建CounterCell对象,value设置成x

          CounterCell r = new CounterCell(x); // Optimistic create

        // CAS 设置cellsBusy字段值为1

          if (cellsBusy == 0 &&

          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

            boolean created = false;

    try { // Recheck under lock

    // 重新校验获取该下标对应的CounterCell对象是否为空

    CounterCell[] rs; int m, j;

    if ((rs = counterCells) != null &&

      (m = rs.length) > 0 &&

      rs[j = (m - 1) & h] == null) {

    // 将新建的CounterCell对象r赋值给counterCells[j]

        rs[j] = r;

        created = true; // 创建成功

    }

    } finally {

      cellsBusy = 0; //退出时设置成0

    }

           if (created)

             break;

           continue; // Slot is now non-empty

          }

        }

        collide = false; //非冲突

      }

    // 当addCount方法调用此方法进来的时候,参数wasUncontended=false

    // 说明在addCount方法中CAS设置counterCells时失败

    // 应该直接把wasUncontended设置成true,然后直接走到最后

    // ThreadLocalRandom.advanceProbe(h)重新生成h值 ,进入下一轮循环

    else if (!wasUncontended) // CAS already known to fail

      wasUncontended = true; // Continue after rehash

            // 如果a对象即 as[(n - 1) & h]) 不为空,那么CAS赋值a的value为v+x

    else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))

      break;

    // counterCells != as 说明有其他线程扩容了counterCells

    // n >= NCPU (cpu 核数)

    // 满足上述两个条件那把collide设置成false

    else if (counterCells != as || n >= NCPU)

      collide = false; // At max size or stale

    else if (!collide)

      collide = true; // 如果collide为false,那么把collide为true

    // 如果 collide 为true,说明并发访问比较大,对counterCells 数组进行扩容

    // 先CAS把cellsBusy 设置成1

    else if (cellsBusy == 0 &&

          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

    try {

    // 以双倍容量新建CounterCell类型数组rs,迁移旧数组数据到新数组

    // 最后把这个新数组赋值给counterCells字段

    if (counterCells == as) {// Expand table unless stale

    CounterCell[] rs = new CounterCell[n << 1];

    for (int i = 0; i < n; ++i)

      rs[i] = as[i];

    counterCells = rs;

    }

    } finally {

      cellsBusy = 0;

    }

        collide = false;

        continue; // Retry with expanded table

      }

      h = ThreadLocalRandom.advanceProbe(h); // 重新生成h值,进入下个循环

    }

    // 如果counterCells为空,那么初始化counterCells数组

    else if (cellsBusy == 0 && counterCells == as &&

      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

    boolean init = false;

    try { // Initialize table

    if (counterCells == as) {

      CounterCell[] rs = new CounterCell[2];

      rs[h & 1] = new CounterCell(x);

      counterCells = rs;

      init = true;

    }

    } finally {

      cellsBusy = 0;

    }

        if (init)

          break;

    }

    // 直接通过CAS修改baseCount的指为baseCount + x

    else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

      break; // Fall back on using base

    }

    }

    2.2 扩容

    private transient volatile int sizeCtl;

    sizeCtl是一个全局控制变量;

    • 初始值是0表示哈希表尚未初始化
    • 如果是-1表示正在初始化,只允许一个线程进入初始化代码块
    • 初始化或者reSize成功后,sizeCtl=loadFactor * tableSize也就是触发再次扩容的阈值,是一个正整数
    • 在扩容过程中,sizeCtrl是一个负整数,其高16位是与当前的tableSize关联的邮戳resizeStamp,其低16位是当前从事搬运工作的线程数加1

    addCount方法中后面有扩容的逻辑.

    private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    .....

    if (check >= 0) {

      Node<K,V>[] tab, nt; int n, sc;

    // 满足扩容条件,s是总结点数,sizeCtl是扩容阈值, s > sizeCtl 时触发扩容

    // table 为空,或者table长度已经达到最大时不触发扩容

    // while 循环保证同一条线程可以多次参与扩容

      while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&

        (n = tab.length) < MAXIMUM_CAPACITY) {

          int rs = resizeStamp(n);

      if (sc < 0) {

    // 扩容结束的判断

        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

          sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

          transferIndex <= 0)

            break;

    // 非第一次扩容操作, 每次把sizeCtl 设置成 sc + 1

        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))

          transfer(tab, nt);

      }

        // 第一次扩容操作,sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2

          else if (U.compareAndSwapInt(this, SIZECTL, sc,

            (rs << RESIZE_STAMP_SHIFT) + 2))

              transfer(tab, null);

          s = sumCount();

      }

    }

    }

    2.2.1 resizeStamp 计算操作

    static final int resizeStamp(int n) {

    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));

    }

    ​ n表示table.length; Integer.numberOfLeadingZeros(n) 方法是计算出n的二进制高位中有多少个连续的0,比如3的二进制中高位有30个0 。

    RESIZE_STAMP_BITS 在类中赋值为16 , 那么1 << (RESIZE_STAMP_BITS - 1) 后得到的二进制为 00000000000000001000000000000000,那么resizeStamp计算得出的二进制第16位为1,除了低五位和第16位可能为1外,其余位都为0,而且n的值不同计算出的值也不同。

    ​ sizeCtl 的值在初始化过程时为-1, 初始化成功后为 0.75 * n,在一开始进入到扩容步骤时,sizeCtl 为 0.75 * n,此时sc = sizeCtl > 0, 在第一条线程进行扩容操作时,会把sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2,由于rs第16位为1,除了低五位可能为1外,其余位都为0,那么rs << RESIZE_STAMP_SHIFT) + 2运算后,最高位为1,那么sizeCtl 绝对是一个负数

    ​ 所以当sc<0时,表示正在扩容,当sc >0 时,表示第一条线程进行扩容操作,把sizeCtl 设置成rs << RESIZE_STAMP_SHIFT) + 2,非第一条线程进行扩容操作的话每次把sizeCtl 设置成 sc + 1

    2.2.2 扩容结束的判断

    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

    transferIndex<= 0)

    break;

    ​当正在进行扩容扩容时sc >>> RESIZE_STAMP_SHIFT可以还原出resizeStamp(n)的值;

    如果扩容已经结束了,那么sizeCtl会被设置成 新的 n * 0.75,此时sc >>> RESIZE_STAMP_SHIFT 跟resizeStamp(int n)计算出的值不会相等;

    ​map中是通过transferIndex这个字段来控制线程负责的桶数的,如果是第一条线程进行扩容操作,那么transferIndex = n,并且当前线程会把transferIndex 设置成( transferIndex - stride), stride 是代码计算出来每个线程应该负责的桶数,当前线程就负责下标为 ( transferIndex - stride)transferIndex - 1的桶, 如果不是第一次扩容,那么不用赋值n,直接用transferIndex 计算当前线程应该负责的桶区间;

    ​假设 n = 64 ,stride 为16

    线程1最先进行扩容操作,此时 transferIndex = 64,线程1负责扩容table下标48 -> 63的16个桶,并且设置transferIndex 为 48 ;

    线程2为第二条进行扩容操作的线程,此时transferIndex = 48,当前负责的线程负责扩容table下标32 - >47的16个桶,并且设置transferIndex 为 32 ;

    … 以此类推

    2.2.3 transfer扩容操作

    /** 一个过渡的table表  只有在扩容的时候才会使用 */

    private transient volatile Node<K,V>[] nextTable;

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

    int n = tab.length, stride;

    // 计算每个线程负责的桶数,这里根据CPU核数计算

    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)

      stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果nextTab为空,表示第一条进入的线程进行扩容操作

    // 新建容量为 2*n 的Node数组nextTab

    if (nextTab == null) { // initiating

    try {

      @SuppressWarnings("unchecked")

      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];

      nextTab = nt; ////构造一个nextTable对象,容量是原来的两倍,赋值给nextTab

    } catch (Throwable ex) { // try to cope with OOME

      sizeCtl = Integer.MAX_VALUE;

      return;

    }

      nextTable = nextTab;

      transferIndex = n; // transferIndex 赋值为n

    }

    int nextn = nextTab.length; // 新数组的长度(2*n)

    // //构造一个连节点指针 用于标志位 ,ForwardingNode 类对象的hash值为-1

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    boolean advance = true; //并发扩容的关键属性 如果true 说明这个节点已经处理过

    // 表示扩容是否结束

    boolean finishing = false; // to ensure sweep before committing nextTab

    for (int i = 0, bound = 0;;) {

      Node<K,V> f; int fh;

      //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点

      while (advance) { // 每次for 循环开始advance=true

      int nextIndex, nextBound;

    // 如果当前线程正在处理,或者扩容已经结束

      if (--i >= bound || finishing)

        advance = false;

    // transferIndex <0 表示扩容结束

      else if ((nextIndex = transferIndex) <= 0) {

        i = -1;

        advance = false;

      }

    // transferIndex 设置成nextIndex(即transferIndex) - stride

      else if (U.compareAndSwapInt

        (this, TRANSFERINDEX, nextIndex,

        nextBound = (nextIndex > stride ?

        nextIndex - stride : 0))) {

      // 当前线程负责的就是下标 bound 到 nextIndex - 1 的桶

          bound = nextBound;

          i = nextIndex - 1;

          advance = false;

      }

      }

    // 如果扩容结束

      if (i < 0 || i >= n || i + n >= nextn) {

        int sc;

      // 如果finishing为true,表示所有桶处理完成,扩容结束,就把nextTable赋值给table 清空临时对象nextTable,并且sizeCtl = 新 n * 0.75

        if (finishing) {

          nextTable = null;

          table = nextTab;

      //扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍

          sizeCtl = (n << 1) - (n >>> 1);

          return;

        }

      //CAS方法更新这个扩容阈值,sizectl-1,说明新加入一个线程参与到扩容操作

        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

      // 那么表示扩容还未结束,并非最后一个退出的线程

          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

            return;

          finishing = advance = true; // 设置扩容结束标志

        //异常巧妙的设计,最后一个线程退出前将i回退到最高位,等于是强制做最后一次的全表扫描;程序直接执行后续的else if代码,看有没有哪个槽位漏掉了,或者说是否全部是forwardingNode标记;

        //可以视为抄底逻辑,虽然检测到漏掉槽位的概率基本是0

          i = n; // recheck before commit

        }

      }

    // 如果当前桶为空,那么把 table[i] 放入ForwardingNode指针

    // 那么下次线程调用put 或者 remove方法时如果发现table[i] 的 hash < 0 就协助扩容

    else if ((f = tabAt(tab, i)) == null)

    advance = casTabAt(tab, i, null, fwd);

    // 如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心

    else if ((fh = f.hash) == MOVED) // 当前桶已经被扩容过

      advance = true; // already processed

    else { // 锁住当前桶(链表或者红黑树,进行操作)

    synchronized (f) {

    if (tabAt(tab, i) == f) {

    Node<K,V> ln, hn;

    // fh>=0 证明这是一个Node节点

    if (fh >= 0) {

    //此处省略链表搬运代码: 将链表拆成两份,搬运到nextTable的i和i+n槽位

    setTabAt(nextTab, i, ln); //在nextTable的i位置上插入一个链表

    setTabAt(nextTab, i + n, hn); //在nextTable的i+n的位置上插入另一个链表

    // table[i] 设置成fwd, 那么下次线程调用put 或者 remove方法时如果发现table[i] 的hash <0 就协助扩容

    setTabAt(tab, i, fwd);

    //设置advance为true 返回到上面的while循环中 就可以执行i--操作

    advance = true;

    }

    else if (f instanceof TreeBin) { // 红黑树

    // 此处省略红黑树搬运代码: 将红黑树拆成两份,搬运到nextTable的i和i+n槽位,如果满足红黑树的退化条件,顺便将其退化为链表

    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :

    (hc != 0) ? new TreeBin<K,V>(lo) : t;

    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :

    (lc != 0) ? new TreeBin<K,V>(hi) : t;

    setTabAt(nextTab, i, ln);

    setTabAt(nextTab, i + n, hn);

    //设置旧表对应槽位的头节点为forwardingNode , 那么下次线程调用put 或者 remove方法时如果发现table[i] 的hash <0 就协助扩容

    setTabAt(tab, i, fwd);

    advance = true;

    }

    }

    }

    }

    }

    }

    多个线程并发搬运时,如果是首个搬运线程,负责nextTable的初始化工作;

    然后借助于全局的transferIndex变量从当前table的n-1槽位开始依次向低位扫描搬运,通过对transferIndex的CAS操作一次获取一个区段(默认是16),当transferIndex达到最低位时,不再能够获取到新的区段,线程开始退出,退出时会在sizeCtl上将总的线程数减一,最后一个退出的线程将扫描坐标i回退到最高位,强迫做一次抄底的全局扫描。

    其中对链表和红黑树的扩容拆分操作和HashMap类似。

    2.3 put方法小结 (重点)

    Node + CAS + Synchronized;

    put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。

    ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是不允许key或value为null值。另外由于涉及到多线程,put方法 在多线程中可能有以下两个情况

    1.如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;

    2.如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

    整体流程就是:

    首先定义不允许key或value为null的情况放入 ;

    对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。

    如果这个位置是空的,那么直接放入,而且不需要加锁操作。

    如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。

    如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。

    如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

    2.4 get方法

    2.4.1 get操作为什么不需要加锁?

    首先需要明确的是,ConcurrentHashMap的读操作一般是不加锁的(TreeBin的读写锁除外),而读操作与写操作有可能并行;可以保证的是,因为ConcurrentHashMap的写操作都要获取bin头部的syncronized互斥锁,能保证最多只有一个线程在做更新,这其实是一个单线程写、多线程读的并发安全性的问题。

    2.4.2 volatile

    volatile关键字作用:

    Java提供了volatile关键字来保证可见性、有序性。但不保证原子性。

    普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。

    • volatile关键字对于基本类型的修改可以在随后对多个线程的读保持一致,但是对于引用类型如数组,实体bean,仅仅保证引用的可见性,但并不保证引用内容的可见性。。
    • 禁止进行指令重排序。

    背景:

    为了提高处理速度,CPU不直接和内存进行通信,而是先将系统内存的数据读到CPU内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。

    • 如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。
    • 在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,当某个CPU在写数据时,如果发现操作的变量是共享变量,则会通知其他CPU告知该变量的缓存行是无效的,因此其他CPU在读取该变量时,发现其无效会重新从主存中加载数据。

    总结下来:

    第一:使用volatile关键字会强制将修改的值立即写入主存;

    第二:使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量的缓存行无效(反映到硬件层的话,就是CPU的L1或者L2缓存中对应的缓存行无效);

    第三:由于线程1的工作内存中缓存变量的缓存行无效,所以线程1再次读取变量的值时会去主存读取。

    2.4.3 volatile修饰Node

    get操作可以无锁是由于Node的元素val和指针next是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的; 和Node[] table 数组用volatile修饰没有关系。

    另外,Node[] table 数组用volatile修饰主要是保证在数组扩容的时候保证可见性。

    static class Node<K,V> implements Map.Entry<K,V> {

    final int hash;

    final K key;

    volatile V val;

    volatile Node<K,V> next;

    ...

    }

    2.4.4 get源码

    public V get(Object key) {

    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

    int h = spread(key.hashCode()); //计算hash值

    //根据hash值确定节点位置

    if ((tab = table) != null && (n = tab.length) > 0 &&

    (e = tabAt(tab, (n - 1) & h)) != null) {

    //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点

    if ((eh = e.hash) == h) {

      if ((ek = e.key) == key || (ek != null && key.equals(ek)))

      return e.val;

    }

    //e节点hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来

    //eh= -1,说明该节点是一个ForwardingNode,正在迁移,此时调用ForwardingNode的find方法去nextTable里找。

    //eh= -2,说明该节点是一个TreeBin,此时调用TreeBin的find方法遍历红黑树,由于红黑树有可能正在旋转变色,所以find里会有读写锁

    //eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。

      else if (eh < 0)

        return (p = e.find(h, key)) != null ? p.val : null;

      //否则遍历链表 找到对应的值并返回

      while ((e = e.next) != null) {

        if (e.hash == h &&

        ((ek = e.key) == key || (ek != null && key.equals(ek))))

          return e.val;

      }

    }

    return null;

    }

    1. 如果读取的bin是一个链表

    说明头节点是个普通Node,

    (1)如果正在发生链表向红黑树的treeify工作,因为treeify本身并不破坏旧的链表bin的结构,只是在全部treeify完成后将头节点一次性替换为新创建的TreeBin,可以放心读取。

    (2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的链表bin的结构,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。

    (3)如果其它线程正在操作链表,在当前线程遍历链表的任意一个时间点,都有可能同时在发生add/replace/remove操作。

    • 如果是add操作,因为链表的节点新增从JDK8以后都采用了后入式,无非是多遍历或者少遍历一个tailNode。
    • 如果是remove操作,存在遍历到某个Node时,正好有其它线程将其remove,导致其孤立于整个链表之外;但因为其next引用未发生变更,整个链表并没有断开,还是可以照常遍历链表直到tailNode。
    • 如果是replace操作,链表的结构未变,只是某个Node的value发生了变化,没有安全问题。

    结论:

    对于链表这种线性数据结构,单线程写且插入操作保证是后入式的前提下,并发读取是安全的;不会存在误读、链表断开导致的漏读、读到环状链表等问题。

    2.如果读取的bin是一个红黑树

    说明头节点是个TreeBin节点。

    (1)如果正在发生红黑树向链表的untreeify操作,因为untreeify本身并不破坏旧的红黑树结构,只是在全部untreeify完成后将头节点一次性替换为新创建的普通Node,可以放心读取。

    (2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的红黑树结构,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。

    (3)如果其他线程在操作红黑树,在当前线程遍历红黑树的任意一个时间点,都可能有单个的其它线程发生add/replace/remove/红黑树的翻转等操作,参考下面的红黑树的读写锁实现。

    3.TreeBin中的读写锁实现 (重要)

    static final class TreeBin<K,V> extends Node<K,V> {

    TreeNode<K,V> root;

    volatile TreeNode<K,V> first;

    volatile Thread waiter;

    volatile int lockState;

    // values for lockState

    static final int WRITER = 1; // set while holding write lock

    static final int WAITER = 2; // set when waiting for write lock

    static final int READER = 4; // increment value for setting read lock

    ......

    private final void lockRoot() {

        //如果一次性获取写锁失败,进入contendedLock循环体,循环获取写锁或者休眠等待

    if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))

    contendedLock(); // offload to separate method

    }

    private final void unlockRoot() {

    lockState = 0;

    }

    /** 红黑树加互斥锁,也就是写锁

    * Possibly blocks awaiting root lock. */

    private final void contendedLock() {

    boolean waiting = false;

    for (int s;;) {

    //1.如果lockState除了第二位外其它位上都为0,表示红黑树当前既没有上读锁,又没有上写锁,仅有可能存在waiter,可以尝试直接获取写锁

    if (((s = lockState) & ~WAITER) == 0) {

    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {

    if (waiting)

    waiter = null;

    return;

    }

    }

    //2.如果lockState第二位是0,表示当前没有线程在等待写锁

    else if ((s & WAITER) == 0) {

    //将lockState的第二位设置为1,相当于打上了waiter的标记,表示有线程在等待写锁

    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {

    waiting = true;

    waiter = Thread.currentThread();

    }

    }

    //3.休眠当前线程

    else if (waiting)

    LockSupport.park(this);

    }

    }

    //查找红黑树中的某个节点

    final Node<K,V>find(int h, Object k) {

    if (k != null) {

    for (Node<K,V>e = first; e != null; ) {

      int s; K ek;

    //如果当前有waiter有写锁,走线性检索,因为红黑树虽然替代了链表,但其内部依然保留了链表的结构,虽然链表的查询性能一般,但根据先前的分析其读取的安全性有保证

    //发现有写锁改走线性检索,是为了避免等待写锁释放花去太久时间; 而发现有waiter改走线性检索,是为了避免读锁叠加的太多,导致写锁线程需要等待太长的时间; 本质上都是为了减少读写碰撞

    //线性遍历的过程中,每遍历到下一个节点都做一次判断,一旦发现锁竞争的可能性减少就改走tree检索以提高性能

    if (((s = lockState) & (WAITER|WRITER)) != 0) {

    if (e.hash == h &&

    ((ek = e.key) == k || (ek != null && k.equals(ek))))

        return e;

      e = e.next;

    }

    //对红黑树加共享锁,也就是读锁,CAS一次性增加4,也就是增加的只是3~32位

    else if (U.compareAndSwapInt(this, LOCKSTATE, s,

    s + READER)) {

      TreeNode<K,V> r, p;

    try {

      p = ((r = root) == null ? null :

      r.findTreeNode(h, k, null));

    } finally {

      Thread w;

    //释放读锁,如果释放完毕且有waiter,则将其唤醒

      if (U.getAndAddInt(this, LOCKSTATE, -READER) ==

        (READER|WAITER) && (w = waiter) != null)

          LockSupport.unpark(w);

    }

      return p;

    }

    }

    }

      return null;

    }

    /** 更新红黑树中的某个节点 */

    final TreeNode<K,V> putTreeVal(int h, K k, V v) {

    Class<?> kc = null;

    boolean searched = false;

    for (TreeNode<K,V> p = root;;) {

      int dir, ph; K pk;

          //...省略处理红黑树数据结构的代码若干

    else {

      lockRoot(); //写操作前加互斥锁

    try {

      root = balanceInsertion(root, x);

    } finally {

      unlockRoot(); //释放互斥锁

    }

    }

    break;

    }

    }

    assert checkInvariants(root);

    return null;

    }

    }

    红黑树内置了一套读写锁的逻辑,其内部定义了32位的int型变量lockState,第1位是写锁标志位,第2位是写锁等待标志位,从3~32位则是共享锁标志位。

    图片来源: https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug

    读写操作是互斥的,允许多个线程同时读取,但不允许读写操作并行,同一时刻只允许一个线程进行写操作;这样任意时间点读取的都是一个合法的红黑树,整体上是安全的。

    TreeBin在find读操作检索时,在linearSearch(线性检索)和treeSearch(树检索)间做了折衷,前者性能差但并发安全,后者性能佳但要做并发控制,可能导致锁竞争;

    设计者使用线性检索来尽量避免读写碰撞导致的锁竞争,但评估到race condition已消失时,又立即趋向于改用树检索来提高性能,在安全和性能之间做到了极佳的平衡。具体的折衷策略请参考find方法及注释。

    由于有线性检索这样一个抄底方案,以及入口处bin头节点的synchornized机制,保证了进入到TreeBin整体代码块的写线程只有一个;TreeBin中读写锁的整体设计与ReentrantReadWriteLock相比还是简单了不少,比如并未定义用于存放待唤醒线程的threadQueue,以及读线程仅会自旋而不会阻塞等等, 可以看做是特定条件下ReadWriteLock的简化版本。

    4.如果读取的bin是一个ForwardingNode

    说明当前bin已迁移,调用其find方法到nextTable读取数据。

    static final class ForwardingNode<K,V> extends Node<K,V> {

    final Node<K,V>[] nextTable;

    ForwardingNode(Node<K,V>[] tab) {

      super(MOVED, null, null, null);

      this.nextTable = tab;

      }

    //递归检索哈希表链

    Node<K,V> find(int h, Object k) {

    // loop to avoid arbitrarily deep recursion on forwarding nodes

    outer: for (Node<K,V>[] tab = nextTable;;) {

    Node<K,V> e; int n;

    if (k == null || tab == null || (n = tab.length) == 0 ||

    (e = tabAt(tab, (n - 1) & h)) == null)

      return null;

    for (;;) {

      int eh; K ek;

      if ((eh = e.hash) == h &&

        ((ek = e.key) == k || (ek != null && k.equals(ek))))

        return e;

      if (eh < 0) {

    if (e instanceof ForwardingNode) {

      tab = ((ForwardingNode<K,V>)e).nextTable;

      continue outer;

    }

      else

        return e.find(h, k);

      }

    if ((e = e.next) == null)

      return null;

    }

    }

      }

    }

    ForwardingNode中保存了nextTable的引用,会转向下一个哈希表进行检索,但并不能保证nextTable就一定是currentTable,因为在高并发插入的情况下,极短时间内就可以导致哈希表的多次扩容,内存中极有可能驻留一条哈希表链,彼此以bin的头节点上的ForwardingNode相连,线程刚读取时拿到的是table1,遍历时却有可能经历了哈希表的链条。

    eh<0有三种情况:

    如果是ForwardingNode继续遍历下一个哈希表。

    如果是TreeBin,调用其find方法进入TreeBin读写锁的保护区读取数据。

    如果是ReserveNode,说明当前有compute计算中,整条bin还是一个空结构,直接返回null。

    5.如果读取的bin是一个ReserveNode

    ReserveNode用于compute/computeIfAbsent原子计算的方法,在BIN的头节点为null且计算尚未完成时,先在bin的头节点打上一个ReserveNode的占位标记

    读操作发现ReserveNode直接返回null,

    写操作会因为争夺ReserveNode的互斥锁而进入阻塞态,在compute完成后被唤醒后循环重试。

    2.5 delete方法

    final V replaceNode(Object key, V value, Object cv) {

    int hash = spread(key.hashCode());

    for (Node<K,V>[] tab = table;;) {

      Node<K,V> f; int n, i, fh;

      // 如果table 为空,或者table[i] 为空,那么直接返回

      if (tab == null || (n = tab.length) == 0 ||

        (f = tabAt(tab, i = (n - 1) & hash)) == null)

        break;

      // 如果table[i] 的hash 为 -1,那么协助扩容

      else if ((fh = f.hash) == MOVED)

        tab = helpTransfer(tab, f);

      else {

    V oldVal = null;

    boolean validated = false;

    synchronized (f) {

    // 链表删除

    if (tabAt(tab, i) == f) {

      if (fh >= 0) {

      validated = true;

      for (Node<K,V> e = f, pred = null;;) {

      K ek;

      if (e.hash == hash &&

        ((ek = e.key) == key ||

        (ek != null && key.equals(ek)))) {

      V ev = e.val;

      if (cv == null || cv == ev ||

        (ev != null && cv.equals(ev))) {

          oldVal = ev;

        if (value != null)

          e.val = value;

        else if (pred != null)

          pred.next = e.next;

        else

          setTabAt(tab, i, e.next);

      }

        break;

      }

      pred = e;

      if ((e = e.next) == null)

        break;

      }

      }

    // 红黑树删除

      else if (f instanceof TreeBin) {

    validated = true;

    TreeBin<K,V> t = (TreeBin<K,V>)f;

    TreeNode<K,V> r, p;

      if ((r = t.root) != null &&

        (p = r.findTreeNode(hash, key, null)) != null) {

      V pv = p.val;

      if (cv == null || cv == pv ||

        (pv != null && cv.equals(pv))) {

          oldVal = pv;

        if (value != null)

          p.val = value;

        else if (t.removeTreeNode(p))

          setTabAt(tab, i, untreeify(t.first));

    }

    }

    }

    }

    }

    // 总结点数减一,不检查扩容

    if (validated) {

    if (oldVal != null) {

      if (value == null)

      addCount(-1L, -1);

      return oldVal;

    }

      break;

    }

    }

    }

    return null;

    }

    三.问题

    参考:

    https://blog.csdn.net/cx897459376/article/details/106427587

    https://blog.csdn.net/caoxiaohong1005/article/details/78083655

    https://www.cnblogs.com/keeya/p/9632958.html

    https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug

  • 相关阅读:
    第一节:SpringMVC概述
    SpringMVC【目录】
    Windows 系统快速查看文件MD5
    (error) ERR wrong number of arguments for 'hmset' command
    hive使用遇到的问题 cannot recognize input
    Overleaf支持的部分中文字体预览
    Understanding and Improving Fast Adversarial Training
    Django2实战示例 第十三章 上线
    Django2实战示例 第十二章 创建API
    Django2实战示例 第十一章 渲染和缓存课程内容
  • 原文地址:https://www.cnblogs.com/coloz/p/14447184.html
Copyright © 2011-2022 走看看