zoukankan      html  css  js  c++  java
  • ConcurrentHashMap实现原理及源码分析

    并发环境下为什么使用ConcurrentHashMap

    1. HashMap在高并发的环境下,执行put操作会导致HashMap的Entry链表形成环形数据结构,从而导致Entry的next节点始终不为空,因此产生死循环获取Entry

    2. HashTable虽然是线程安全的,但是效率低下,当一个线程访问HashTable的同步方法时,其他线程如果也访问HashTable的同步方法,那么会进入阻塞或者轮训状态。

    3. 在jdk1.6中ConcurrentHashMap使用锁分段技术提高并发访问效率。首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。

    问题:ConcurrentHashMap(JDK1.8)为什么要使用synchronized而不是可重入锁?

    我想从下面几个角度讨论这个问题:

    1. 锁的粒度 
      首先锁的粒度并没有变粗,甚至变得更细了。每当扩容一次,ConcurrentHashMap的并发度就扩大一倍。
    2. Hash冲突 
      JDK1.7中,ConcurrentHashMap从过二次hash的方式(Segment -> HashEntry)能够快速的找到查找的元素。在1.8中通过链表加红黑树的形式弥补了put、get时的性能差距。
    3. 扩容 
      JDK1.8中,在ConcurrentHashmap进行扩容时,其他线程可以通过检测数组中的节点决定是否对这条链表(红黑树)进行扩容,减小了扩容的粒度,提高了扩容的效率。

    为什么是synchronized,而不是可重入锁 
    1. 减少内存开销 
    假设使用可重入锁来获得同步支持,那么每个节点都需要通过继承AQS来获得同步支持。但并不是每个节点都需要获得同步支持的,只有链表的头节点(红黑树的根节点)需要同步,这无疑带来了巨大内存浪费。 
    2. 获得JVM的支持 
    可重入锁毕竟是API这个级别的,后续的性能优化空间很小。 
    synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得synchronized能够随着JDK版本的升级而不改动代码的前提下获得性能上的提升。

    JDK1.6分析

    ConcurrentHashMap采用 分段锁的机制,实现并发的更新操作,底层由Segment数组和HashEntry数组组成。Segment继承ReentrantLock用来充当锁的角色,每个 Segment 对象守护每个散列映射表的若干个桶。HashEntry 用来封装映射表的键 / 值对;每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组,下面我们通过一个图来演示一下 ConcurrentHashMap 的结构:

    这里写图片描述

    JDK1.8分析

    改进一:取消segments字段,直接采用transient volatile HashEntry<K,V> table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

    改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

    ConcurrentHashMap的重要属性

    /**
     * races. Updated via CAS.
     * 记录容器的容量大小,通过CAS更新
     */
     private static final long BASECOUNT;
    
    /**
     * 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是所有修改都在CAS中进行,但是sizeCtl为什么不设计成LongAdder(jdk8出现的)类型呢?
     * 或者设计成AtomicLong(在高并发的情况下比LongAdder低效),这样就能减少自己操作CAS了。
     *
     * 默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
     * -1 代表table正在初始化
     * -N 表示有N-1个线程正在进行扩容操作
     * 其余情况:
     *1、如果table未初始化,表示table需要初始化的大小。
     *2、如果table初始化完成,表示table的容量,默认是table大小的0.75 倍,居然用这个公式算0.75(n - (n >>> 2))。
     **/
    private static final long SIZECTL;
    
    /**
     *  自旋锁 (锁定通过 CAS) 在调整大小和/或创建 CounterCells 时使用。 在CounterCell类更新value中会使用,功能类似显示锁和内置锁,性能更好
     *  在Striped64类也有应用
     */
     private static final long CELLSBUSY;

    Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

        static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;//volatile类型的
            volatile Node<K,V> next;//volatile类型的
    
    
            Node(int hash, K key, V val, Node<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.val = val;
                this.next = next;
            }
            //省略部分代码
           }

    ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。

        static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
            //省略部分代码
            }

    ConcurrentHashMap的构造函数

        //默认的构造函数
        public ConcurrentHashMap(){}
    
        /**
        *initialCapacity 初始化容量
        **/
        public ConcurrentHashMap(int initialCapacity) {}
    
        /**
        *
        *创建与给定map具有相同映射的新map
        **/
        public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
       /**
        *initialCapacity 初始容量
        *loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容
        *concurrencyLevel 预估的并发更新线程数
        **/
        public ConcurrentHashMap(int initialCapacity, float loadFactor) {}
    
        /**
        *initialCapacity 初始容量
        *loadFactor 负载因子
        *concurrencyLevel 预估的并发更新线程数
        **/
         public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {}

    接下来具体看看第四个构造函数的具体实现:

     public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (initialCapacity < concurrencyLevel)   //至少使用尽可能多的bin
                initialCapacity = concurrencyLevel;   //作为估计线程
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;//初始化sizeCtl
        }
        /**
        *返回给定所需容量,table的大小总是2的幂次方
        **/
        private static final int tableSizeFor(int c) {
            int n = c - 1;
            n |= n >>> 1;
            n |= n >>> 2;
            n |= n >>> 4;
            n |= n >>> 8;
            n |= n >>> 16;
            return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
        }

    ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作

    put()方法的实现

        public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
        final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//对hashCode进行再散列,算法为(h ^ (h >>> 16)) & HASH_BITS
        int binCount = 0;
     //这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
        //因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                // 如果没有找到值为key的节点,直接新建Node并加入链表即可。
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 如果节点数>=8,那么转换链表结构为红黑树结构。
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 计数增加1,有可能触发transfer操作(扩容)。
        addCount(1L, binCount);
        return null;
    }
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    /*
     *但是这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
     *ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址
     *所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址
     * 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
    */
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

    我们还是继续一步步看代码,看inputVal的注释a,这个方法helpTransfer,如果线程进入到这边说明已经有其他线程正在做扩容操作,这个是一个辅助方法

    /**
     * Helps transfer if a resize is in progress.
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                //下面几种情况和addCount的方法一样,请参考addCount的备注
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

    当我们的putVal执行到addCount的时候

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
    
        //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟来都baseCount都加1因为x=1
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //多线程CAS发生失败的时候执行
                fullAddCount(x, uncontended);//2
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //当条件满足开始扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {//如果小于0说明已经有线程在进行扩容操作了
                    //一下的情况说明已经有在扩容或者多线程进行了扩容,其他线程直接break不要进入扩容操作
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//如果相等说明扩容已经完成,可以继续扩容
                        transfer(tab, nt);
                }
                //这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数,这边加上2很巧妙,因为transfer后面对sizeCtl--操作的时候,最多只能减两次就结束
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

    看上面注释1,每次都会对baseCount 加1,如果并发竞争太大,那么可能导致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,那么为了提高高并发的时候baseCount可见性失败的问题,又避免一直重试,这样性能会有很大的影响,那么在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的,是AtomicLong的加强版,AtomicLong在高并发场景性能会比LongAdder差。但是LongAdder的空间复杂度会高点。

    我们每次进来都对baseCount进行加1当达到一定的容量时,就需要对table进行扩容。扩容方法就是transfer,这个方法稍微复杂一点,大部分的代码我都做了注释

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //构建一个连节点的指针,用于标识位
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        //循环的关键变量,判断是否已经扩容完成,完成就return,退出循环
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //循环的关键i,i--操作保证了倒序遍历数组
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默认16)
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //i<0说明已经遍历完旧的数组tab;i>=n什么时候有可能呢?在下面看到i=n,所以目前i最大应该是n吧。
            //i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {// a
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;//finishing和advance保证线程已经扩容完成了可以退出循环
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)//如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)//那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //这边还对链表进行遍历,这边的的算法和hashmap的算法又不一样了,这班是有点对半拆分的感觉
                            //把链表分表拆分为,hash&n等于0和不等于0的,然后分别放在新表的i和i+n位置
                            //次方法同hashmap的resize
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//下面红黑树基本和链表差不多
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //判断扩容后是否还需要红黑树结构
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

    值得细细品味的是,transfer的for循环是倒叙的,说明对table的遍历是从table.length-1开始到0的。我觉得这段代码写得太牛逼了,特别是

    //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        //如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;//finishing和advance保证线程已经扩容完成了可以退出循环
        i = n; // recheck before commit
    }

    注意:如果链表结构中元素超过TREEIFY_THRESHOLD阈值,默认为8个,则把链表转化为红黑树,提高遍历查询效率.接下来我们看看如何构造树结构,代码如下:

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

    可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证table中index位置元素是否被修改过。 
    1、根据table中index位置Node链表,重新生成一个hd为头结点的TreeNode链表。 
    2、根据hd头结点,生成TreeBin树结构,并把树结构的root节点写到table的index位置的内存中,具体实现如下:

    TreeBin(TreeNode<K,V> b) {
        super(TREEBIN, null, null, null);
        this.first = b;
        TreeNode<K,V> r = null;
        for (TreeNode<K,V> x = b, next; x != null; x = next) {
            next = (TreeNode<K,V>)x.next;
            x.left = x.right = null;
            if (r == null) {
                x.parent = null;
                x.red = false;
                r = x;
            }
            else {
                K k = x.key;
                int h = x.hash;
                Class<?> kc = null;
                for (TreeNode<K,V> p = r;;) {
                    int dir, ph;
                    K pk = p.key;
                    if ((ph = p.hash) > h)
                        dir = -1;
                    else if (ph < h)
                        dir = 1;
                    else if ((kc == null &&
                              (kc = comparableClassFor(k)) == null) ||
                             (dir = compareComparables(kc, k, pk)) == 0)
                        dir = tieBreakOrder(k, pk);
                        TreeNode<K,V> xp = p;
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {
                        x.parent = xp;
                        if (dir <= 0)
                            xp.left = x;
                        else
                            xp.right = x;
                        r = balanceInsertion(r, x);
                        break;
                    }
                }
            }
        }
        this.root = r;
        assert checkInvariants(root);
    }

    get()方法

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//如果eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容
            //所以要查找key对应的值的话,直接到新newtable找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

    这个get请求,我们需要cas来保证变量的原子性。如果tab[i]正被锁住,那么CAS就会失败,失败之后就会不断的重试。这也保证了get在高并发情况下不会出错。 
    我们来分析下到底有多少种情况会导致get在并发的情况下可能取不到值。1、一个线程在get的时候,另一个线程在对同一个key的node进行remove操作;2、一个线程在get的时候,另一个线程正则重排table。可能导致旧table取不到值。 
    那么本质是,我在get的时候,有其他线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?我们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是干嘛的:

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    它是对tab[i]进行原子性的读取,因为我们知道putVal等对table的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了Unsafe的getObjectVolatile,因为table是volatile类型,所以对tab[i]的原子请求也是可见的。因为如果同步正确的情况下,根据happens-before原则,对volatile域的写入操作happens-before于每一个后续对同一域的读操作。所以不管其他线程对table链表或树的修改,都对get读取可见。

    参考

    深入浅出ConcurrentHashMap(1.8) 作者 占小狼

    探索jdk8之ConcurrentHashMap 的实现机制 作者 淮左

    Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进 作者 everSeeker

    以下是JDK1.6实现方式:

    ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现(若对HashMap的实现原理还不甚了解,可参考我的另一篇文章HashMap实现原理及源码分析),ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析(JDK1.7).

    ConcurrentHashMap实现原理

      众所周知,哈希表是中非常高效,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

      HashMap :先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。

      HashTable : HashTable和HashMap的实现原理几乎一样,差别无非是1.HashTable不允许key和value为null;2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。

      HashTable性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的"分段锁"思想。

      

    ConcurrentHashMap源码分析   

    ConcurrentHashMap采用了非常精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。

     final Segment<K,V>[] segments;

      Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程并发执行,有木有很酷)

      所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。

    Segment类似于HashMap,一个Segment维护着一个HashEntry数组

     transient volatile HashEntry<K,V>[] table;

    HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。

    复制代码
     static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;
            //其他省略
    }    
    复制代码

    我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
                this.loadFactor = lf;//负载因子
                this.threshold = threshold;//阈值
                this.table = tab;//主干数组即HashEntry数组
            }

    我们来看下ConcurrentHashMap的构造方法

    复制代码
     1  public ConcurrentHashMap(int initialCapacity,
     2                                float loadFactor, int concurrencyLevel) {
     3           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
     4               throw new IllegalArgumentException();
     5           //MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
     6           if (concurrencyLevel > MAX_SEGMENTS)
     7               concurrencyLevel = MAX_SEGMENTS;
     8           //2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
     9          int sshift = 0;
    10          //ssize 为segments数组长度,根据concurrentLevel计算得出
    11          int ssize = 1;
    12          while (ssize < concurrencyLevel) {
    13              ++sshift;
    14              ssize <<= 1;
    15          }
    16          //segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲
    17          this.segmentShift = 32 - sshift;
    18          this.segmentMask = ssize - 1;
    19          if (initialCapacity > MAXIMUM_CAPACITY)
    20              initialCapacity = MAXIMUM_CAPACITY;
    21          //计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
    22          int c = initialCapacity / ssize;
    23          if (c * ssize < initialCapacity)
    24              ++c;
    25          int cap = MIN_SEGMENT_TABLE_CAPACITY;
    26          while (cap < c)
    27              cap <<= 1;
    28          //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
    29          Segment<K,V> s0 =
    30              new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    31                               (HashEntry<K,V>[])new HashEntry[cap]);
    32          Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    33          UNSAFE.putOrderedObject(ss, SBASE, s0); 
    34          this.segments = ss;
    35      }
    复制代码

      初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。

      从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevel,ssize一定是大于或等于concurrentLevel的最小的2的次幂。比如:默认情况下concurrentLevel是16,则ssize为16;若concurrentLevel为14,ssize为16;若concurrentLevel为17,则ssize为32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segment的index。至于更详细的原因,有兴趣的话可以参考我的另一篇文章《HashMap实现原理及源码分析》,其中对于数组长度为什么一定要是2的次幂有较为详细的分析。

      接下来,我们来看看put方法

    复制代码
     public V put(K key, V value) {
            Segment<K,V> s;
            //concurrentHashMap不允许key/value为空
            if (value == null)
                throw new NullPointerException();
            //hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀
            int hash = hash(key);
            //返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }
    复制代码

     从源码看出,put的主要逻辑也就两步:1.定位segment并确保定位的Segment已初始化 2.调用Segment的put方法。

     关于segmentShift和segmentMask

      segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment,int j =(hash >>> segmentShift) & segmentMask。

      segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性

      segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。

      get/put方法

      get方法

    复制代码
     public V get(Object key) {
            Segment<K,V> s; 
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    //先定位Segment,再定位HashEntry if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
    复制代码

      get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

      来看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加锁的。只不过是锁粒度细了而已。

    复制代码
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntry。tryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                  //若c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
    复制代码

     总结

      ConcurrentHashMap作为一种线程安全且高效的哈希表的解决方案,尤其其中的"分段锁"的方案,相比HashTable的全表锁在性能上的提升非常之大。本文对ConcurrentHashMap的实现原理进行了详细分析,并解读了部分源码,希望能帮助到有需要的童鞋。

  • 相关阅读:
    第三章节 BJROBOT 角速度校正 【ROS全开源阿克曼转向智能网联无人驾驶车】
    第二章节 BJROBOT IMU 自动校正 【ROS全开源阿克曼转向智能网联无人驾驶车】
    【扩展】链式编程初识
    【扩展】随机数
    一、.Net基础【1.5】封装MessageBox
    一、.Net基础【1.4】不引入第三变量,交换两个变量的值
    一、.Net基础【1.3】AndAlso & OrElse Operators in C#短路运算符
    一、.Net基础【1.2】变量和数据类型
    一、.Net基础【1.0】入门
    ArcGIS Desktop 10.X 复习与提高【1.1】ArcGIS数据格式的介绍 Esri
  • 原文地址:https://www.cnblogs.com/barrywxx/p/8486060.html
Copyright © 2011-2022 走看看