zoukankan      html  css  js  c++  java
  • 探索jdk8之ConcurrentHashMap 的实现机制

    在介绍ConcurrentHashMap源码之前,很有必要复习下java并发编程中的一些基础知识,比如内存模型等。
    存储模型
    并发编程中的三个概念
    1、原子性
    2、可见性
    3、重排序
    对HashMap在jdk8有所了解
    对CAS有所了解
    对内置锁和显示锁等有所了解

    jdk8对ConcurrentHashMap做了很大的调整,首先因为HashMap在jdk8已经做了数据结构上的优化,增加了红黑树,详情可以参考我之前的博客。所以,jdk7针对ConcurrentHashMap的改进,主要是增加了分段锁Segment对HashEntity的控制,完美的解决了HashMap的安全问题,在JMM中有个名称叫安全发布,已经不适用了。那么,在jdk8如果保持性能的情况下对其进行修改了?它到底做了那些事情呢?

    因为ConcurrentHashMap涉及的内容太多,jdk8有六千多行代码,jdk7才一两千行吧。所以我在想怎么一步步的对其进行剖解,最后我还是觉得按照程序的思路来吧,首先我们跑个ConcurrentHashMap的程序,然后进行调试,来一步步展开。

    public static void main(String[] args) {
        Map<String, String> cm = new ConcurrentHashMap<String, String>();
        for (int i = 0; i < 14; i++) {
            cm.put("key_" + i, "huaizuo_" + i);
        }
    }
    

    首先初始化一个ConcurrentHashMap,因为我们是用默认构造函数,我们来看下初始化的一些重要的字段,去掉英文注释。

    /**
     * races. Updated via CAS.
     * 记录容器的容量大小,通过CAS更新
     */
    private transient volatile long baseCount;
    
    /**
     * 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是所有修改都在CAS中进行,但是sizeCtl为什么不设计成LongAdder(jdk8出现的)类型呢?
     * 或者设计成AtomicLong(在高并发的情况下比LongAdder低效),这样就能减少自己操作CAS了。
     *
     * 来看下注释,当sizeCtl小于0说明有多个线程正则等待扩容结果,参考transfer函数
     *
     * sizeCtl等于0是默认值,大于0是扩容的阀值
     */
    private transient volatile int sizeCtl;
    
    /**
     *  自旋锁 (锁定通过 CAS) 在调整大小和/或创建 CounterCells 时使用。 在CounterCell类更新value中会使用,功能类似显示锁和内置锁,性能更好
     *  在Striped64类也有应用
     */
    private transient volatile int cellsBusy;
    

    还有最重要的节点类Node,注意val和next是volatile类型

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
    

    接下来我们要把元素put到ConcurrentHashMap中了,那么我们来看下putVal的源码吧

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
        //因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//1
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))//2
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)// a
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //这个地方设计非常的巧妙,内置锁synchronized锁住了f,因为f是指定特定的tab[i]的,
                // 所以就锁住了整行链表,这个设计跟分段锁有异曲同工之妙,只是其他读取操作需要用cas来保证
                synchronized (f) {
                    if (tabAt(tab, i) == f) {//3
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//转化为红黑树
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    我们看到代码注释中的1、2、3我特定标注的,因为这些操作都是按照CAS的,其中关键部分已经做了注释,要正确取到真实数据需要知道变量所在的内存偏移量。

    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    /*
     *但是这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
     *ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址
     *所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址
     * 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
    */
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    

    关于sun.misc.Unsafe

    // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;
    
    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            //获取ConcurrentHashMap这个对象字段sizeCtl在内存中的偏移量
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            //可以获取数组第一个元素的偏移地址
            ABASE = U.arrayBaseOffset(ak);
            //arrayIndexScale可以获取数组的转换因子,也就是数组中元素的增量地址
            //将arrayBaseOffset与arrayIndexScale配合使用,可以定位数组中每个元素在内存中的位置。
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    

    还是继续看put源码,看到//a注释,当(fh = f.hash) == MOVED,说明f.hash值为-1(MOVED为-1的final),那么如果hash什么时候回等于-1呢?为什么会有-1这种情况呢?这要涉及到ForwardingNode<K,V>类

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //MOVED 位-1,说明ForwardNode的节点的hash值为-1
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
    

    这个类是继承Node类的,他在初始化的时候hash值传了MOVED,我们知道ConcurrentHashMap在的数据结构是Table[]和链表组成,所以如果Table节点是ForwardNode节点的话那么Hash的值就等于-1,那么什么时候Node会变成ForwardNode呢?就是在扩容的时候,旧的Table的节点会临时用ForwardNode代替。待会会介绍。

    我们还是继续一步步看代码,看inputVal的注释a,这个方法helpTransfer,如果线程进入到这边说明已经有其他线程正在做扩容操作,这个是一个辅助方法

    /**
     * Helps transfer if a resize is in progress.
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                //下面几种情况和addCount的方法一样,请参考addCount的备注
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    

    这边如果table的某一个节点对应的链表超过一定的长度之后,就要把链表转化为红黑树的操作我就不详细的在这边文章介绍了,对于转化的操作其实和HashMap是一样的,但是这里涉及到并发,它其实也是通过synchronized和CAS来控制并发的。好了,当我们的putVal执行到addCount的时候

    /**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
    
        //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟来都baseCount都加1因为x=1
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //多线程CAS发生失败的时候执行
                fullAddCount(x, uncontended);//2
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //当条件满足开始扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {//如果小于0说明已经有线程在进行扩容操作了
                    //一下的情况说明已经有在扩容或者多线程进行了扩容,其他线程直接break不要进入扩容操作
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//如果相等说明扩容已经完成,可以继续扩容
                        transfer(tab, nt);
                }
                //这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数,这边加上2很巧妙,因为transfer后面对sizeCtl--操作的时候,最多只能减两次就结束
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    

    看上面注释1,每次都会对baseCount 加1,如果并发竞争太大,那么可能导致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,那么为了提高高并发的时候baseCount可见性失败的问题,又避免一直重试,这样性能会有很大的影响,那么在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的,是AtomicLong的加强版,AtomicLong在高并发场景性能会比LongAdder差。但是LongAdder的空间复杂度会高点。

    // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //获取当前线程的probe值作为hash值,如果0则强制初始化当前线程的Probe值,初始化的probe值不为0
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;//设置未竞争标记为true
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell如果当前没有CounterCell就创建一个
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//这边加上cellsBusy锁
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;//释放cellsBusy锁,让其他线程可以进来
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail wasUncontended为false说明已经发生了竞争,重置为true重新执行上面代码
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//对cell的value值进行累计x(1)
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale 表明as已经过时,说明cells已经初始化完成,看下面,重置collide为false表明已经存在竞争
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale 下面的代码主要是给counterCells扩容,尽可能避免冲突
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//表明counterCells还没初始化,则初始化,这边用cellsBusy加锁
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))//最终如果上面的都失败就把x累计到baseCount
                break;                          // Fall back on using base
        }
    }
    

    源码注释写着See LongAdder version for explanation。我上面已经做了注释了,就不做更多解释了。
    回到addCount来,我们每次竟来都对baseCount进行加1当达到一定的容量时,就需要对table进行扩容。扩容方法就是transfer,这个方法稍微复杂一点,大部分的代码我都做了注释

    /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //构建一个连节点的指针,用于标识位
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        //循环的关键变量,判断是否已经扩容完成,完成就return,退出循环
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //循环的关键i,i--操作保证了倒序遍历数组
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默认16)
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //i<0说明已经遍历完旧的数组tab;i>=n什么时候有可能呢?在下面看到i=n,所以目前i最大应该是n吧。
            //i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {// a
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;//finishing和advance保证线程已经扩容完成了可以退出循环
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)//如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)//那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //这边还对链表进行遍历,这边的的算法和hashmap的算法又不一样了,这班是有点对半拆分的感觉
                            //把链表分表拆分为,hash&n等于0和不等于0的,然后分别放在新表的i和i+n位置
                            //次方法同hashmap的resize
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//下面红黑树基本和链表差不多
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //判断扩容后是否还需要红黑树结构
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    

    值得细细品味的是,transfer的for循环是倒叙的,说明对table的遍历是从table.length-1开始到0的。我觉得这段代码写得太牛逼了,特别是

    //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        //如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;//finishing和advance保证线程已经扩容完成了可以退出循环
        i = n; // recheck before commit
    }
    

    反正很多地方值得细细精读。

    那么我想我已经把ConcurrentHashMap的一部分内容讲完,包括添加元素putVal,扩容transfer等。那么现在我们来看下get方法吧

    /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//如果eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容
            //所以要查找key对应的值的话,直接到新newtable找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    

    这个get请求,我们需要cas来保证变量的原子性。如果tab[i]正被锁住,那么CAS就会失败,失败之后就会不断的重试。这也保证了get在高并发情况下不会出错。
    我们来分析下到底有多少种情况会导致get在并发的情况下可能取不到值。1、一个线程在get的时候,另一个线程在对同一个key的node进行remove操作;2、一个线程在get的时候,另一个线程正则重排table。可能导致旧table取不到值。
    那么本质是,我在get的时候,有其他线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?我们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是干嘛的:

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    

    它是对tab[i]进行原子性的读取,因为我们知道putVal等对table的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了Unsafe的getObjectVolatile,因为table是volatile类型,所以对tab[i]的原子请求也是可见的。因为如果同步正确的情况下,根据happens-before原则,对volatile域的写入操作happens-before于每一个后续对同一域的读操作。所以不管其他线程对table链表或树的修改,都对get读取可见。用一张图说明,协调读-写线程可见示意图:

    那么好奇的我翻看了下jdk7的get方法是怎么处理的,因为我们知道jdk7是没有用到CAS操作和Unsafe类的,下面是jdk7的get方法

    V get(Object key, int hash) { 
                if(count != 0) {       // 首先读 count 变量
                    HashEntry<K,V> e = getFirst(hash); 
                    while(e != null) { 
                        if(e.hash == hash && key.equals(e.key)) { 
                            V v = e.value; 
                            if(v != null)            
                                return v; 
                            // 如果读到 value 域为 null,说明发生了重排序,加锁后重新读取
                            return readValueUnderLock(e); 
                        } 
                        e = e.next; 
                    } 
                } 
                return null; 
            }
    

    为什么我们在get的时候需要判断count不等于0呢?如果是在HashMap的源码中是没有这个判断的,不用判断不是也是可以的吗?这个就是用到线程安全发布情况下happens-before原则之volatile变量法则:对volatile域的写入操作happens-before于每一个后续对同一域的读操作,看下面的示意图:

    如果有读java并发编程实战这本书第16的话,我们知道要有时候我们需要“驾驭”在同步之上。
    为了满足happens-before,这个需要结合“程序次序法则”与另外一种次序法则(通常是“监视器锁法则或“volatile变量法则”)来对访问变量的操作进行排序,否则就用锁来保护它
    对于上图A操作happens-before 于B,C操作happens-before于D。因为count是volatile,所以对count的写要happens-before于读操作。所以B操作happens-before于C。
    根据传递性,连接上面三个 happens-before 关系得到:A appens-before 于 B; B appens-before C;C happens-before D。也就是说:写线程 M 对链表做的结构性修改,在读线程 N 读取了同一个 volatile 变量后,对线程 N 也是可见的了。
    虽然线程 N 是在未加锁的情况下访问链表。Java 的内存模型可以保证:只要之前对链表做结构性修改操作的写线程 M 在退出写方法前写 volatile 型变量 count,读线程 N 在读取这个 volatile 型变量 count 后,就一定能“看到”这些修改。

    这个特性和前面介绍的 HashEntry 对象的不变性相结合,使得在 ConcurrentHashMap 中,读线程在读取散列表时,基本不需要加锁就能成功获得需要的值。这两个特性相配合,不仅减少了请求同一个锁的频率(读操作一般不需要加锁就能够成功获得值),也减少了持有同一个锁的时间(只有读到 value 域的值为 null 时 , 读线程才需要加锁后重读)。

    这个设计非常精彩,要对JMM非常熟悉。跟jdk8的处理手法有异曲同工之妙。

    其他的revove修改的操作跟putVal操作类似这里就不做分析了。

    参考
    《java并发编程实战》
    http://ifeve.com/atomiclong-and-longadder/
    http://brokendreams.iteye.com/blog/2259857
    http://blog.csdn.net/u010723709/article/details/48007881
    http://www.cnblogs.com/daxin/p/3366606.html
    http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/
    https://en.wikipedia.org/wiki/Compare-and-swap
    http://www.cnblogs.com/Mainz/p/3546347.html
    http://coolshell.cn/articles/9703.html
    http://coolshell.cn/articles/8239.html

  • 相关阅读:
    委托和事件的区别
    委托小练习
    线程安全小练习
    线程等待练习
    线程练习
    C# Parallel用法
    Winform 跨线程更新UI控件常用方法汇总
    世界顶级思维,收藏终身受用!
    Win10预览版怎么关闭自动更新?怎么更改更新设置
    win7删除桌面文件后手动刷新才会消失的解决方法
  • 原文地址:https://www.cnblogs.com/huaizuo/p/5413069.html
Copyright © 2011-2022 走看看