zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》04 ConcurrentMap

    Java集合框架中的Map类型的数据结构是非线程安全,在多线程环境中使用时需要手动进行线程同步。因此在java.util.concurrent包中提供了一个线程安全版本的Map类型数据结构:ConcurrentMap。本篇文章主要关注ConcurrentMap接口以及它的Hash版本的实现ConcurrentHashMap。

    ConcurrentMap是Map接口的子接口

    public interface ConcurrentMap<K, V> extends Map<K, V>

    与Map接口相比,ConcurrentMap多了4个方法:

    1)putIfAbsent方法:如果key不存在,添加key-value。方法会返回与key关联的value。

    V putIfAbsent(K key, V value);

    2)remove方法

    boolean remove(Object key, Object value);

    Map接口中也有一个remove方法:

    V remove(Object key);

    ConcurrentMap中的remove方法需要比较原有的value和参数中的value是否一致,只有一致才会删除。

    3)Replace方法:有2个重载

    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);

    两个重载的区别和2)中的两个remove方法的区别很类似,多了一个检查value一致。

    通过ConcurrentMap多出来的方法可以看到多线程中一个很重要的概念:compare。compare的作用就是为了保证value的一致性。

    重头戏来了:ConcurrentHashMap。

    public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
            implements ConcurrentMap<K, V>, Serializable

    ConcurrentHashMap和HashMap类似,这里重点关注的是如何实现线程安全,也就是如何加锁。

    对于HashMap来说,有一个Entry数组,根据Key的hash值对数组长度取模得到数组下标,找到Entry,遍历整个Entry链表,用equals比较来确定key所在的Entry。

    ConcurrentHashMap的基本思想是采取分块的方式加锁,分块数由参数“concurrencyLevel”来决定(和HashMap中的“initialCapacity”类似,实际块数是第一个大于concurrencyLevel的2的n次方)。每个分块被称为Segment,Segment的索引方式和HashMap中的Entry索引方式一致(hash值对数组长度取模)。

    对Segment加锁的方式很简单,直接把Segment定义为ReentrantLock的子类。同时Segment又是一个特定实现的hash table。

    static final class Segment<K,V> extends ReentrantLock implements Serializable

    下面分析ConcurrentHashMap读写时如何加锁。

    首先是读操作类的方法,来看get方法:

    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }

    可以看到,读取的时候没有调用的Segment的获取锁的方法,而是通过hash值定位到Entry,然后遍历Entry的链表。为什么这里不用加锁呢?看看HashEntry的代码就会明白了。

        static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;

    value和next属性是带有volatile修饰符的,可以大胆放心的遍历和比较。

    接着是写操作,写操作是肯定要加锁的。因为Segment可以看成是一个hash table,因此ConcurrentHashMap直接调用Segment的对应的写入方法如put,replace等。

    比如put方法

        public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }

    因此这里直接关注Segment的对应写操作方法即可。在每个写操作的方法开头都这样的类似代码:

            final V remove(Object key, int hash, Object value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);

    也就是,首先尝试获取锁,如果成功则会带锁继续操作,失败则要通过scanAndLockscanAndLockForPut获取锁,因此这里关注的重点也就转移到这两个方法了。

    按照多线程环境的规则,如果尝试获取锁失败的话就会进入阻塞等待状态,那么这两个方法的作用应该是类似的。

            private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // speculatively create node
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }

    这两个方法的逻辑:在等待的时候闲着没事儿干把该做好的准备做好,查找一下目标entry,如果是新建entry就把entry创建好,然后如果一切没问题就用lock()方法把自己给阻塞了,也就是做好准备然后去等着了。

    因为Segment本身就可以看成一个hash table,因此必然涉及rehash的问题,因为和HashMap中的rehash类似,在这里就省略了。

  • 相关阅读:
    Directx11学习笔记【九】 【转】 3D渲染管线
    排序算法总结
    [LeetCode92]Reverse Linked List II
    c++析构函数为什么要为虚函数
    简析iOS动画原理及实现——Core Animation
    Xcode 插件集:xTextHandler
    [iOS] 在 ios10 中使用 imessage
    UITableView-FDTemplateLayoutCell 学习笔记
    Xcode 7.3 cannot create __weak reference in file using manual reference counting
    iOS10个实用小技巧(总有你不知道的和你会用到的)
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3898846.html
Copyright © 2011-2022 走看看