zoukankan      html  css  js  c++  java
  • 死磕 java集合之ConcurrentHashMap源码分析(一)

    开篇问题

    (1)ConcurrentHashMap与HashMap的数据结构是否一样?

    (2)HashMap在多线程环境下何时会出现并发安全问题?

    (3)ConcurrentHashMap是怎么解决并发安全问题的?

    (4)ConcurrentHashMap使用了哪些锁?

    (5)ConcurrentHashMap的扩容是怎么进行的?

    (6)ConcurrentHashMap是否是强一致性的?

    (7)ConcurrentHashMap不能解决哪些问题?

    (8)ConcurrentHashMap中有哪些不常见的技术值得学习?

    简介

    ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。

    相比于同样线程安全的HashTable来说,效率等各方面都有极大地提高。

    各种锁简介

    这里先简单介绍一下各种锁,以便下文讲到相关概念时能有个印象。

    (1)synchronized

    java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。

    synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。

    偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。

    轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。

    重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。

    (2)CAS

    CAS,Compare And Swap,它是一种乐观锁,认为对于同一个数据的并发操作不一定会发生修改,在更新数据的时候,尝试去更新数据,如果失败就不断尝试。

    (3)volatile(非锁)

    java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。(这里牵涉到java内存模型的知识,感兴趣的同学可以自己查查相关资料)

    volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i++操作,不保证每次结果都正确,因为i++操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况volatile是无法保证的。

    (4)自旋锁

    自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗CPU。

    (5)分段锁

    分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。

    (5)ReentrantLock

    可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。

    其实,synchronized也是可重入锁。

    源码分析

    构造方法

    public ConcurrentHashMap() {
    }
    
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                MAXIMUM_CAPACITY :
                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    

    构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面,那么它是怎么用的呢?官方给出的解释如下:

    (1)-1,表示有线程正在进行初始化操作

    (2)-(1 + nThreads),表示有n个线程正在一起扩容

    (3)0,默认值,后续在真正初始化的时候使用默认容量

    (4)> 0,初始化或扩容完成后下一次的扩容门槛

    至于,官方这个解释对不对我们后面再讨论。

    添加元素

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key和value都不能为null
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值
        int hash = spread(key.hashCode());
        // 要插入的元素所在桶的元素个数
        int binCount = 0;
        // 死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程)
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 如果桶未初始化或者桶个数为0,则初始化桶
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
                    // 如果使用CAS插入元素成功,则break跳出循环,流程结束
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                // 如果要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素
                tab = helpTransfer(tab, f);
            else {
                // 如果这个桶不为空且不在迁移元素,则锁住这个桶(分段锁)
                // 并查找要插入的元素是否在这个桶中
                // 存在,则替换值(onlyIfAbsent=false)
                // 不存在,则插入到链表结尾或插入树中
                V oldVal = null;
                synchronized (f) {
                    // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
                    if (tabAt(tab, i) == f) {
                        // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
                        // 那就是桶中的元素使用的是链表方式存储
                        if (fh >= 0) {
                            // 桶中元素个数赋值为1
                            binCount = 1;
                            // 遍历整个桶,每次结束binCount加1
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                                    // 并退出循环
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    // 如果到链表尾部还没有找到元素
                                    // 就把它插入到链表结尾并退出循环
                                    pred.next = new Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 如果第一个元素是树节点
                            Node<K,V> p;
                            // 桶中元素个数赋值为2
                            binCount = 2;
                            // 调用红黑树的插入方法插入元素
                            // 如果成功插入则返回null
                            // 否则返回寻找到的节点
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false)
                                // 并退出循环
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
                if (binCount != 0) {
                    // 如果链表元素个数达到了8,则尝试树化
                    // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数
                    // 所以不会重复树化
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果要插入的元素已经存在,则返回旧值
                    if (oldVal != null)
                        return oldVal;
                    // 退出外层大循环,流程结束
                    break;
                }
            }
            }
            // 成功插入元素,元素个数加1(是否要扩容在这个里面)
            addCount(1L, binCount);
            // 成功插入元素返回null
            return null;
        }
    

    整体流程跟HashMap比较类似,大致是以下几步:

    (1)如果桶数组未初始化,则初始化;

    (2)如果待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;

    (3)如果正在扩容,则当前线程一起加入到扩容的过程中;

    (4)如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);

    (5)如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;

    (6)如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;

    (7)如果元素存在,则返回旧值;

    (8)如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;

    添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。

    为什么使用synchronized而不是ReentrantLock?

    因为synchronized已经得到了极大地优化,在特定情况下并不比ReentrantLock差。


    未完待续~~


    qrcode

  • 相关阅读:
    Comet OJ CCPC-Wannafly Winter Camp Day1 (Div2, online mirror) F.爬爬爬山-最短路(Dijkstra)(两个板子)+思维(mdzz...) zhixincode
    Codeforces 1104 D. Game with modulo-交互题-二分-woshizhizhang(Codeforces Round #534 (Div. 2))
    POJ 1655.Balancing Act-树的重心(DFS) 模板(vector存图)
    Codeforces gym102058 J. Rising Sun-简单的计算几何+二分 (2018-2019 XIX Open Cup, Grand Prix of Korea (Division 2))
    BZOJ 3261: 最大异或和位置-贪心+可持久化01Trie树
    51nod 1295 XOR key-区间异或最大值-可持久化01Trie树(模板)
    BZOJ 2588: Spoj 10628. Count on a tree-可持久化线段树+LCA(点权)(树上的操作) 无语(为什么我的LCA的板子不对)
    Codeforces 1099 D. Sum in the tree-构造最小点权和有根树 贪心+DFS(Codeforces Round #530 (Div. 2))
    Codeforces 586D. Phillip and Trains 搜索
    Codeforces 734E. Anton and Tree 搜索
  • 原文地址:https://www.cnblogs.com/tong-yuan/p/10674283.html
Copyright © 2011-2022 走看看