zoukankan      html  css  js  c++  java
  • 16 JDK8的concurrenthashmap的原理介绍

    一 ConcurrentHashMap原理(JDK1.8的实现)

    1-1 重要属性与内部类

     sizeCtl;      // size control:懒惰 初始化,
    
    1-1-1 四个内部类

    1)内部类Node(静态内部类)

    • 主要包含了键,值,hash码,next指针。
        static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;  
            ......
    

    2)内部类ForwardingNode

    • 扩容时用于标记该数组位置的元素已经处理
    • get是告诉线程不应该去旧的hash表找,应该去新的hash表获取值。
     /* ---------------- Special Nodes -------------- 
        /**
         * A node inserted at head of bins during transfer operations.
         */
        static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
    

    3,4)内部类 TreeBin<K,V> (头节点)和TreeNode<K,V>用于构建红黑树

    • 提高查找效率
    • 一定程度上避免DOS攻击(构造hash值相同的对象攻击链表)

    1-1-2 什么时候链表转化为红黑树,什么时候红黑树转化为链表?

    链表->红黑树:容量已经超过64,并且单个链表达到8。(容量小可以通过扩容降低冲突rehash

    红黑树->链表:当红黑树节点数目小于6的时候,则转化为链表。

    1-1-3 重要的属性的定义(关注volatile修饰的变量)
    • 采用volatile修饰数组对象的引用保证hash表以及扩容后的hash表的可见性
    • 采用transient让序列化的过程中忽视数组的数组的引用
    • volatile修饰的sizeCtl在扩容中会采用CAS机制进行操作
    / 默认为 0
    // 当初始化时, 为 -1
    // 当扩容时, 为 -(1 + 扩容线程数)
    // 当初始化或扩容完成后,为 下一次的扩容的阈值大小
    private transient volatile int sizeCtl;
    // 整个 ConcurrentHashMap 就是一个 Node[]
    static class Node<K,V> implements Map.Entry<K,V> {}   //
    // hash 表
    transient volatile Node<K,V>[] table;
    // 扩容时的 新 hash 表
    private transient volatile Node<K,V>[] nextTable;
    // 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
    static final class ForwardingNode<K,V> extends Node<K,V> {}
    // 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
    static final class ReservationNode<K,V> extends Node<K,V> {}
    // 作为 treebin 的头节点, 存储 root 和 first
    static final class TreeBin<K,V> extends Node<K,V> {}
    // 作为 treebin 的节点, 存储 parent, left, right
    static final class TreeNode<K,V> extends Node<K,V> {}
    
    1-1-4 Node数组的重要的操作方法
    • 给新的桶中添加头节点的采用的是CAS操作( casTabAt)
    // 获取 Node[] 中第 i 个 Node
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
    // cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
    // 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
    

    1-2 构造器分析

    注意:真实容量会设置为最为接近的2的n次幂。

       /*
          initialCapacity:初始容量(注意:这里的容量并非) loadFactor:装填因子   concurrencyLevel:并发度
       */
       public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            /*确保初始容量大于等于并发度*/
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            /*计算size = 1+(设定的初始容量/负载因子)*/
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
           /*将真实容量调整为最为接近的2^n*/
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
        }
    

    注意点

    懒惰初始化:JDK8中数组的初始化是懒惰初始化,在构造函数中仅仅计算出数组的size大小,只有用到的时候才会初始化数组。

    • 相比较JDK7这是一个改进,JDK7中hashmap的数组并非懒惰初始化,对内存占用比较大

    数组大小:真实数组大小是2^n的数值:可以采用位运算替代%运算,还可以让扩容后的元素分布更加均匀。

    实例:设initialCapacity为8,loadFactor为0.75,那么 size = 1.0+(8/0.75) = 11.67 , 则 sizeCtl = 16.
    

    HashMap为何喜欢2的倍数扩容

    1-3 get函数分析(重要)

    • 通过hash值的正负区分头节点的类型(链表头节点,树的头节点,扩容中的节点)
        public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            /* spread:将计算出的hashcode变为正数*/
            /* h:需要查找的hash值*/
            int h = spread(key.hashCode());
            /*e = tabAt(tab, (n - 1) & h): 获取 tab 中第 (n - 1) & h 个 Node*/
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                /*情况1:查询的节点是头节点,当前节点hash值eh等于计算出的h,即根据查询的key计算出的hash值*/
                if ((eh = e.hash) == h) {
                    /*通过==和equals判断是否为同一个key*/
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                /*情况2:当前节点hash值eh < 0, 这个时候可能是由于扩容,也有可能是由于当前的桶不是链表结构。
                  1)扩容中,说明当前的bin是ForwardingNode,调用find方法去扩容后的数组中查找元素。
                  2)是treebin,同样的采用红黑树的find方法查找。
                */
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                /*
                    情况3:查询的节点不是链表的头节点,遍历节点
                */
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    注意点

    • 并发的hashmap的get方法并没有利用到锁,因此效率非常高
    • 查找的时候需要考虑三种情况:链表节点,红黑树节点,当前是否处于扩容状态

    1-4 put函数源码分析

    要点

    • 通过头节点的hash值区分节点的类型

    • 碰撞时使用synchronized锁住了当前桶下标头节点对象

    • 没有节点采用CAS机制创建新的节点

    JDK1.8源码

    
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    ==================================================================================================
     /** Implementation for put and putIfAbsent */
        /* 注意点1:
           onlyIfAbsent为true:第一次放入这个key的时候才会将键值对存入到map中,之后如果有相同的key进入将不会进行任何操作。
           onlyIfAbsent为false:有相同的key放入会修改key锁对应的value。
        */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
        /*
           注意点2:concurrenthashmap不允许键值对为null,而普通的hashmap允许键值对为null。
        */
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            // 进入死循环
            for (Node<K,V>[] tab = table;;) {
                // f:链表头节点 fn:头节点的hash i:链表在table中的下标
                Node<K,V> f; int n, i, fh; 
                /*注意点3:hash表的初始化时懒惰初始化的,利用CAS机制确保创建的线程安全性*/
                /*死循环分支1:查看hash表是否已经初始化,初始化后,进入下一次循环*/
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                /*死循环分支2:查看当前hashcode所对应的桶是否为空,如果为空,则创建头节点并利用CAS机制插入到链表中*/
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                /*死循环分支3:查看头节点的值是否为-1,如果是负值,则说明当前map正在扩容中,则该线程帮助扩容。
                  为什么hash码是负数就代表是在扩容?
                  原因:hash表在扩容的时候是以链表为单位的,每完成一个链表的扩容,就会将该链表的头部设置为forwardingNode,
                  forwardingNode的hash值就是一个负数。MOVED值为-1.
                
                */
                else if ((fh = f.hash) == MOVED)
                    /*锁住当前链表,帮忙去扩容*/
                    tab = helpTransfer(tab, f);
                /*死循环分支4:发生桶下标的冲突,此时会对当前桶下标对应的链表的头节点进行加锁,*/
                else {
                    V oldVal = null;
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            // 判断当前的桶中节点是否为普通节点(hash值大于0)
                            if (fh >= 0) {
                                binCount = 1;
                                // 遍历链表节点
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    // 如果hash值,key值相等,则更新value的值
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    // 如果是最后一个节点,新增Node,追加Node
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            } // 当前桶下标存放的是红黑树节点,调用红黑树的方法插入节点
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    // bincount是链表的长度,不为0,
                    if (binCount != 0) {
                        // 如果链表的长度达到阈值,进行转换
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            //采用类似longAdder进行节点个数的计数
            // 1L就是表示long类型的值是1,
            addCount(1L, binCount);
            return null;
        }
    
    put方法中hash表的初始化方法
    /*
        hash表初始化的代码:可以看到采用了CAS机制确保多个线程环境下初始化的安全性
    */
        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            // 循环操作,确保多线程环境下一个线程安全初始化hash表。
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)  // sizeCtl为负数表示初始化,或者扩容
                    Thread.yield(); // lost initialization race; just spin
                /* 
                   这里使用SIZECTL作为CAS机制的比较值,尝试设为-1表示正在创建hash表
                */
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    
    put方法关联的方法addCount(用于累加)

    总结addCount实现了两个功能

    1)维护hashmap的sets的数量的计数。
    2)如果整个set的计数已经超过了阈值的时候,进行一个扩容操作
    

    注意:数量的累加利用longAdder类似的思想,可以实现并发的累加汇总求和

    longAdder的原理

    ==============对当前桶下标下节点个数的累加================================================
          // x:需要累加的数值x,  check是桶中节点的个数
          private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();   
            }
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }     // 需要扩容,设置sc为负数,newtable为
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }
    

    1-5 concurrenthashmap的size方法

    总结:大小计算的总结

    1)size的累加计算是在put与remove操作过程动态计算的。
    2)多线程下size的计算通过多个累加单元分别计算,最后汇总
    具体情况如下:
      a.没有竞争发生时,只需要baseCount累加即可
      b.有竞争发生时,创建counterCells,向其中一个cell累加计数。
            counterCells处理有2个cell单元。
            竞争比较积累的话,会创建多个cell。
    

    JDK1.8源码分析

        public int size() {
            long n = sumCount();                // 获取计数值
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
        }
    
        /* ---------------- Counter support -------------- */
    
        /**
         * A padded cell for distributing counts.  Adapted from LongAdder
         * and Striped64.  See their internal docs for explanation.
         */
        @sun.misc.Contended static final class CounterCell {
            volatile long value;
            CounterCell(long x) { value = x; }
        }
    
        final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {                 // 计数值通过累加各个单元的数值实现
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    1-6 concurrentHashmap的面试总结(重要,结合源码理解)

    基本思想:使用CAS机制以及synchronized去保证读的安全性,使用volatile保证写的可见性

    要点:1)写的线程安全性保证。 2 )读的时候可见性的保证。 3) 扩容机制的线程安全性的保证(主要是在读/写的时候的需要考虑当前是否处于扩容状态)。

    如何保证多个线程的修改对其他线程的可见性?

    • 使用volatile修饰了数组的引用以及sizectrl变量
    // hash 表
    transient volatile Node<K,V>[] table;
    // 扩容时的 新 hash 表
    private transient volatile Node<K,V>[] nextTable;
    private transient volatile int sizeCtl;  
    
    写安全性的保证:put方法(使用CAS机制与synchronized共同保证安全性)
    step1:计算hash值
    step2:死循环
    1)当没有创建数组,则创建hash数组(通过CAS机制修改用SIZECTL为-1告诉其他线程正在创建hash表)。
    2)当检查hash值对应的桶为空,为空则采用CAS机制(casTabAt)创建对应的头节点。
    3)当检查hash值对应的桶不为空,则根据头节点的hash值正负分情况讨论,
      情况1:当hash值为负数的时候 表明当前有其他线程进行扩容,这个时候线程会帮助去扩容。
      情况2:当hash值为正数,此时需要遍历桶中的节点进行节点的添加或者修改,这个时候会通过synchronized对头节点进行加锁操作,确保线程安全性。  
    
    step3:put方法死循环之外的一些操作:
     1)如果发现当前链表长度达到阈值则进行转换。
     2)统计hash表的大小(利用了类似longAdder的原理)。 
    
    读安全性的保证:get方法的思想(没有加锁)
    step1:计算完hash值后
    step2:
        情况1:判断hash数组引用对象是否为空以及hash值对应的桶下标是否为空,为空则返回null。
        情况2:对应的桶不为空,则再判断是否为桶中头节点的hash是否为正
        	   A:为正则头节点是链表节点,此时在链表中查找到目标节点返回即可
               B:为负则头节点是树的节点或者当前hash表正在扩容中,如果是树节点,则按照树的方法查找,如果正在扩容中,
                 则查找到的节点是forwardNode,则需要到扩容后的新的hash表中查找。
    

    参考资料

    并发基础课程

  • 相关阅读:
    unity基础开发----Unity获取PC,Ios系统的mac地址等信息
    Web UI设计师需要了解的用栅格化系统指导网页设计
    设计网页,常见的宽度是多少像素?
    C#常用类库简介(二)
    将本地代码上传到gitLab
    删除git 分支
    git 新建分支
    将子分支代码merge到主分支master分支
    dev分支代码覆盖master分支代码
    使用flex的同时设置超出喜爱是省略号,
  • 原文地址:https://www.cnblogs.com/kfcuj/p/14687793.html
Copyright © 2011-2022 走看看