zoukankan      html  css  js  c++  java
  • ConcurrentHashMap源码分析

    作者: dreamcatcher-cx

    出处: http://www.cnblogs.com/chengxiao/

    1. 引言

    ConcurrentHashMap 是 java 并发包中提供的一个线程安全且高效的 HashMap 实现,ConcurrentHashMap 在并发编程的场景中使用频率非常之高,本文就来分析下 ConcurrentHashMap 的实现原理,并对其实现原理进行分析(JDK1.7)。

    2. 实现原理

    众所周知,哈希表是非常高效,复杂度为 O(1) 的数据结构,在 Java 开发中,我们最常见到最频繁使用的就是 HashMap 和 HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

    HashMap :先说 HashMap,HashMap 是 线程不安全,在并发环境下,可能会形成 环形链表 (扩容时可能造成),导致 get 操作时,cpu 空转,所以,在并发环境中使用 HashMap 是非常危险的。

    HashTable:HashTable 和 HashMap 的实现原理几乎是一样,差别无非就是 1. HashTable 不允许 key 和 value 为 null;2. HashTable 是线程安全的。 但是 HashTable 线程安全的策略实现代价太大了,简单粗暴,get / put 所有相关操作都是 synchronized 的,这相当于给整个哈希表加了一把大锁,多线程访问的时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作 串行化,在竞争激烈的并发场景中性能就会非常差。

    HashTable 全标锁

    HashTable 性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是 ConcurrentHashMap 所采用的 分段锁 思想。

    ConcurrentHashMap 分段锁

    3. 源码分析

    ConcurrentHashMap 采用了非常精妙的「分段锁」策略,ConcurrentHashMap 的主干是个 Segment 数组。

    final Segment<K,V>[] segments;
    

    Segment 继承了 ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在 ConcurrentHashMap,一个 Segment 就是一个子哈希表,Segment 里维护了一个 HashEntry 数组,并发环境下,对于不同 Segment 的数据进行操作是不用考虑锁竞争的。(就按默认的 ConcurrentLeve 为 16 来讲,理论上就允许 16 个线程并发执行,有木有很酷)

    所以,对于同一个 Segment 的操作才需要考虑线程同步,不同的 Segment 则无需考虑。

    Segment 类似于 HashMap,一个 Segment 维护着一个 HashEntry 数组。

    transient volatile HashEntry<k,v>[] table;
    

    HashEntry 是目前我们提到的最小的逻辑处理单元了。一个 ConcurrentHashMap 维护一个 Segment 数组,一个 Segment 维护一个 HashEntry 数组。

    static final class HashEntry<K,V>{
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        // 。。。
    }
    

    我们说 Segment 类似哈希表,那么一些属性就跟我们之前提到的 HashMap 差不了多少,比如负载因子 loadFactor,比如阈值 threshold 等等,看下 Segment 的构造方法:

    Segment(float lf,int threshold,HashEntry<K,V> tab){
        this.loadFactor = lf;         // 负载因子
        this.threshold = threshold;   // 阈值
        this.table = tab;             // 主干数组即 HashEntry 数组 
    }
    

    我们来看下 ConcurrentHashMap 的构造方法

    public ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel){
        if(!(loadFactor)>0 || initialCapacity<0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // MAX_SEGEMENTS 为 1<<16=65536 也就是最大并发数位 65536
        if(concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 2 的 sshif 次方等于 ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
        int sshift = 0;
        // ssize 为 segments 数组的长度,根据 concurrentLevel 计算得出
        while(ssize<concurrencyLevel){
            ++sshift;
            ssize <<= 1;
        }
        // segmentShift 和 segmentMask 这两个变量在定位 segment 时会用到,后面会详细讲
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize -1;
        if(initialCapacity>MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 计算 cap 的大小,即 Segment 中 hashEntry 的数组长度,cap 也一定为 2 的 n 次方
        int c = initialCapacity / ssize;
        if(c*ssize<initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while(cap<c)
            cap<<1;
        // 创建 segments 数组并初始化第一个 Segment,其余的 Segment 延迟初始化
        Segment<K,V> s0 = new Segment<K,V>(loadFactor,(int)(cap*loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[]) new Segment[ssize];
        UNSAFE.putOrderedObject(ss,SBASE,s0);
        this.segments = ss;
    }
    

    初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity 为 16,loadFactor 为 0.75(负载因子,扩容时需要参考),concurrentLevel 为 16。

    从上面的代码可以看出来,Segment 数组的大小 ssize 是由 concurrentLevel 来决定的,但是却不一定是 concurrentLevel,ssize 一定是大于或等于 concurrentLevel 的最小 2 的次幂。比如:默认情况下 concurrentLevel 是 16,则 ssize 为 16;弱 concurrentLevel 为 14,ssize 为 16;若 concurrentLevel 为 17,则 ssize 为 32。为什么 Segment 的数组大小一定是 2 的次幂?其实主要是便于通过按位与的散列算法来定位 Segment 的 index。至于更详细的原因,有兴趣的话可以参考另一篇 文章 ,其中对于数组长度为什么一定要是 2 的次幂有较为详细的分析。

    接下来,我们看一下 put 方法

    public V put(K key,V value){
        Segment<K,V> s;
        // concurrentHashMap 不允许 key/value 为空
        if(value == null)
            throw new NullPointerException();
        // hash 函数对 key 的 hashCode 重新散列,避免差劲的不合理的 hashcode,保证三类均匀。
        int hash = hash(key);
        // 返回的 hash 值无符号右移 segementShift 位与端掩码进行位运算,定位 segment
        int j = (hash >>> segmentShift) & segmentMashk;
        if((s=(Segment<K,V>)UNSAFE.getObject(segments,(j<<SSHIFT)+SBASE))==null)
            s = ensureSegment(j);
        return s.put(key,hash,value,false);     
    }
    

    从源码看出,put 的方法主要逻辑也就两步:1. 定位 segment 并确保定位 Segment 已经初始化;2.调用 Segment 的 put 方法

    4. segmentShift 和 segmentMask

    segmentShift 和 segmentMask 这两个全局变量的主要作用是用来定位 Segment,int j = (hash >>> segmentShift) & segmentMask

    segmentMask:段掩码,假如 segments 数组长度为 16,则段掩码为 16-1=15;segments 长度为 32,段掩码为 32-1=31。这样等到的所有 bit 为都为 1,可以更好地保证散列的均匀性

    segmentShift:2 的 sshift 次方等于 ssize,segmentShift = 32-sshift。若 segments 长度为 16,segmentShift = 32-4 = 28;若 segments 长度为 32,segmentShift = 32-5 = 27.而计算得出的 hash 值最大为 32 位,无符号右移 segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码 segmentMask 为运算来定位 Segment。

    get 方法

    public V get(Object key){
        Segment<K,V> s;
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h>>>segmentShift) & segmentMask)<<sshift) + SBASE;
        // 先定位到 Segment,在定位到 HashEntry
        if((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab=s.table)!=null){
            for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e!=null;e=e.next){
                K k;
                if((k=e.key)==key || (e.hash==h && key.equals(k))){
                    return e.value;
                }
            }
        }
        return null;
    }
    

    get 方法无需加锁,由于其中涉及到的共享变量都是用 volatile 修饰,volatile 可以保证内存可见性。只不过是锁粒度细了而已

    put 方法

    ConcurrentHashMap put 方法是调用了 Segment 的 put 方法,Segment 的 put 方法是要加锁的,只不过是锁粒度细了而已。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntry。tryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                  //若c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    

    5. 总结

    ConcurrentHashMap 作为一种线程安全且高效的哈希表解决方案,尤其其中的 "分段锁" 的方案,相比 HashTable 的全表锁在性能上的提升非常之大。

  • 相关阅读:
    Js通用验证
    C#实现马尔科夫模型例子
    C# 生成pdf文件客户端下载
    Js跨一级域名同步cookie
    C#数据库连接池 MySql SqlServer
    关于Oracle row_number() over()的简单使用
    开发中mybatis的一些常见问题记录
    Java通过图片url地址获取图片base64位字符串的两种方式
    基于apache httpclient的常用接口调用方法
    通过jcrop和canvas的画布功能完成对图片的截图功能与视频的截图功能实现
  • 原文地址:https://www.cnblogs.com/firepation/p/9456044.html
Copyright © 2011-2022 走看看