zoukankan      html  css  js  c++  java
  • ConcurrentHashMap1.8源码解析

    深入并发包 ConcurrentHashMap

    概述

    JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本。

    结构

    基本属性

    // node数组最大容量:2^30=1073741824
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // 默认初始值,必须是2的幕数
    private static final int DEFAULT_CAPACITY = 16;
    //数组可能最大值,需要与toArray()相关方法关联
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    //并发级别,遗留下来的,为兼容以前的版本
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    // 负载因子
    private static final float LOAD_FACTOR = 0.75f;
    // 链表转红黑树阀值,> 8 链表转换为红黑树
    static final int TREEIFY_THRESHOLD = 8;
    //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
    static final int UNTREEIFY_THRESHOLD = 6;
    //转红黑树要Node数组大小的阈值 static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中记录size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 树根节点的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 可用处理器数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //存放node的数组 transient volatile Node<K,V>[] table; /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容 *当为0时:代表当时的table还没有被初始化 *当为正数时:表示初始化或者下一次进行扩容的大小 private transient volatile int sizeCtl;

    Node

    Node是ConcurrentHashMap存储结构的基本单元,继承于HashMap中的Entry,用于存储数据,源代码如下

    static class Node<K,V> implements Map.Entry<K,V> {
        //链表的数据结构
        final int hash;
        final K key;
        //val和next都会在扩容时发生变化,所以加上volatile来保持可见性和禁止重排序
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        //不允许更新value  
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        //用于map中的get()方法,子类重写
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

    Node数据结构很简单,从上可知,就是一个链表,但是只允许对数据进行查找,不允许进行修改。

    TreeNode

    TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时(并且Node数组大于64)会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树源代码如下。

    static final class TreeNode<K,V> extends Node<K,V> {
        //树形结构的属性定义
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red; //标志红黑树的红节点
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }
        //根据key查找 从根节点开始找出相应的TreeNode,
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

    方法解析

    put

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
            Node<K,V> f; int n, i, fh;
            //这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) { //表示该节点是链表结构
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里涉及到相同的key进行put就会覆盖原先的value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {  //插入链表尾部
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树结构
                            Node<K,V> p;
                            binCount = 2;
                            //红黑树结构旋转插入
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//统计size,并且检查是否需要扩容
        return null;
    }
    put

    这个put的过程很清晰,对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。

    1. 如果没有初始化就先调用initTable()方法来进行初始化过程
    2. 如果没有hash冲突就直接CAS插入
    3. 如果还在进行扩容操作就先进行扩容
    4. 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
    5. 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
    6. 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

    get

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算两次hash
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
            if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
            //查找,查找到就返回
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    get

    ConcurrentHashMap的get操作的流程很简单,也很清晰,可以分为三个步骤来描述

    1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回
    2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
    3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

    size

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a; //变化的数量
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    size

    相比较 size 方法,mappingCount 方法的返回值是 long 类型。所以不必限制最大值必须是 Integer.MAX_VALUE。而 JDK 推荐使用这个方法。但这个返回值依然不一定绝对准确。

    在没有并发的情况下,使用一个 baseCount volatile 变量就足够了,当并发的时候,CAS 修改 baseCount 失败后,就会使用 CounterCell 类了,会创建一个这个对象,通常对象的 volatile value 属性是 1。在计算 size 的时候,会将 baseCount 和 CounterCell 数组中的元素的 value 累加,得到总的大小,但这个数字仍旧可能是不准确的。

    还有一个需要注意的地方就是,这个 CounterCell 类使用了 @sun.misc.Contended 注解标识,这个注解是防止伪共享的。是 1.8 新增的。使用时,需要加上 -XX:-RestrictContended 参数。

    addCount

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //更新baseCount,table的数量,counterCells表示元素个数的变化
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //如果多个线程都在执行,则CAS失败,执行fullAddCount,全部加入count
            if (as == null || (m = as.length - 1) < 0 || 
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
         //check>=0表示需要进行扩容操作
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //当前线程发起扩容操作,nextTable=null
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

    1.7 1.8比较

    基本差异

    JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。

    JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)

    JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了

    JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档。

    synchronized代替ReentrantLock

    1.    因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了。synchronized只要线程可以在30到50次自旋里拿到锁,那么Synchronized就不会升级为重量级锁,而等待的线程也就不用被挂起,我们也就少了挂起和唤醒这个上下文切换的过程开销。

    但如果是ReentrantLock呢?它则只有在线程没有抢到锁,然后新建Node节点后再尝试一次而已,不会自旋,而是直接被挂起,这样一来,我们就很容易会多出线程上下文开销的代价.当然,你也可以使用tryLock(),但是这样又出现了一个问题,你怎么知道tryLock的时间呢?在时间范围里还好,假如超过了呢?

    所以,在锁被细化到如此程度上,使用Synchronized是最好的选择了.这里再补充一句,Synchronized和ReentrantLock他们的开销差距是在释放锁时唤醒线程的数量,Synchronized是唤醒锁池里所有的线程+刚好来访问的线程,而ReentrantLock则是当前线程后进来的第一个线程+刚好来访问的线程.

    如果是线程并发量不大的情况下,那么Synchronized因为自旋锁,偏向锁,轻量级锁的原因,不用将等待线程挂起,偏向锁甚至不用自旋,所以在这种情况下要比ReentrantLock高效。

    2.    JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然。

    3.    在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据。

  • 相关阅读:
    JavaSE学习笔记(二十六)—— IO流概述&字节流
    JavaSE学习笔记(二十五)—— File类
    JavaSE学习笔记(二十四)—— 异常
    JavaSE学习笔记(二十三)—— Collections工具类
    JavaSE学习笔记(二十二)—— Map集合
    JavaSE学习笔记(二十一)—— Set集合
    Gin框架
    Go gRPC Hello World
    Go
    Gin框架
  • 原文地址:https://www.cnblogs.com/fanguangdexiaoyuer/p/10733236.html
Copyright © 2011-2022 走看看