zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》04 ConcurrentMap

    Java集合框架中的Map类型的数据结构是非线程安全,在多线程环境中使用时需要手动进行线程同步。因此在java.util.concurrent包中提供了一个线程安全版本的Map类型数据结构:ConcurrentMap。本篇文章主要关注ConcurrentMap接口以及它的Hash版本的实现ConcurrentHashMap。

    ConcurrentMap是Map接口的子接口

    public interface ConcurrentMap<K, V> extends Map<K, V>

    与Map接口相比,ConcurrentMap多了4个方法:

    1)putIfAbsent方法:如果key不存在,添加key-value。方法会返回与key关联的value。

    V putIfAbsent(K key, V value);

    2)remove方法

    boolean remove(Object key, Object value);

    Map接口中也有一个remove方法:

    V remove(Object key);

    ConcurrentMap中的remove方法需要比较原有的value和参数中的value是否一致,只有一致才会删除。

    3)Replace方法:有2个重载

    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);

    两个重载的区别和2)中的两个remove方法的区别很类似,多了一个检查value一致。

    通过ConcurrentMap多出来的方法可以看到多线程中一个很重要的概念:compare。compare的作用就是为了保证value的一致性。

    重头戏来了:ConcurrentHashMap。

    public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
            implements ConcurrentMap<K, V>, Serializable

    ConcurrentHashMap和HashMap类似,这里重点关注的是如何实现线程安全,也就是如何加锁。

    对于HashMap来说,有一个Entry数组,根据Key的hash值对数组长度取模得到数组下标,找到Entry,遍历整个Entry链表,用equals比较来确定key所在的Entry。

    ConcurrentHashMap的基本思想是采取分块的方式加锁,分块数由参数“concurrencyLevel”来决定(和HashMap中的“initialCapacity”类似,实际块数是第一个大于concurrencyLevel的2的n次方)。每个分块被称为Segment,Segment的索引方式和HashMap中的Entry索引方式一致(hash值对数组长度取模)。

    对Segment加锁的方式很简单,直接把Segment定义为ReentrantLock的子类。同时Segment又是一个特定实现的hash table。

    static final class Segment<K,V> extends ReentrantLock implements Serializable

    下面分析ConcurrentHashMap读写时如何加锁。

    首先是读操作类的方法,来看get方法:

    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }

    可以看到,读取的时候没有调用的Segment的获取锁的方法,而是通过hash值定位到Entry,然后遍历Entry的链表。为什么这里不用加锁呢?看看HashEntry的代码就会明白了。

        static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;

    value和next属性是带有volatile修饰符的,可以大胆放心的遍历和比较。

    接着是写操作,写操作是肯定要加锁的。因为Segment可以看成是一个hash table,因此ConcurrentHashMap直接调用Segment的对应的写入方法如put,replace等。

    比如put方法

        public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }

    因此这里直接关注Segment的对应写操作方法即可。在每个写操作的方法开头都这样的类似代码:

            final V remove(Object key, int hash, Object value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);

    也就是,首先尝试获取锁,如果成功则会带锁继续操作,失败则要通过scanAndLockscanAndLockForPut获取锁,因此这里关注的重点也就转移到这两个方法了。

    按照多线程环境的规则,如果尝试获取锁失败的话就会进入阻塞等待状态,那么这两个方法的作用应该是类似的。

            private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // speculatively create node
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }

    这两个方法的逻辑:在等待的时候闲着没事儿干把该做好的准备做好,查找一下目标entry,如果是新建entry就把entry创建好,然后如果一切没问题就用lock()方法把自己给阻塞了,也就是做好准备然后去等着了。

    因为Segment本身就可以看成一个hash table,因此必然涉及rehash的问题,因为和HashMap中的rehash类似,在这里就省略了。

  • 相关阅读:
    2020年. NET Core面试题
    java Context namespace element 'component-scan' and its parser class ComponentScanBeanDefinitionParser are only available on JDK 1.5 and higher 解决方法
    vue 淡入淡出组件
    java http的get、post、post json参数的方法
    vue 父子组件通讯案例
    Vue 生产环境解决跨域问题
    npm run ERR! code ELIFECYCLE
    Android Studio 生成apk 出现 :error_prone_annotations.jar (com.google.errorprone:error) 错误
    记忆解析者芜青【总集】
    LwIP应用开发笔记之十:LwIP带操作系统基本移植
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3898846.html
Copyright © 2011-2022 走看看