zoukankan      html  css  js  c++  java
  • 自学ConcuurentHashMap源码

    自学ConcuurentHashMap源码
    #

    参考:https://my.oschina.net/hosee/blog/675884
    http://www.cnblogs.com/ITtangtang/p/3948786.html
    本文需要关注的地方

    1. 利用分段锁实现多个线程并发写入、删除或者修改(默认16);
    2. 利用HashEntry的不变性和 volatile 变量的可见性来保证get读几乎不需要加锁(判断获取的entry的value是否为null,为null时才使用加锁的方式再次去获取,原因是put的时候的指令重排序)
    3. get的三种情况分析
    4. 每个版本的ConcurrentHashMap几乎都有改动,本文说明的JDK6的源码。JDK8相比前面的版本改动最大,最后有简单的说明

    一、基础知识

      并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap能够提供更高的并发度。同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。ConcurrentHashMap关键在于使用了分段锁技术。
      在ConcurrentHashMap的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。每一个锁保护的区域称为段(Segment),每个段其实就是一个小的Hashtable,它们有自己的锁。假设散列函数具有合理的分布性,并且关键字能够均匀分布,那么这大约能把对于锁的请求减少到原来的1/16,正是这项技术使得ConcurrentHashMap能够支持多达16个并发的写入器。
      ConcurrentHashMap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
      尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于size 返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。因为:事实上size和isEmpty这样的方法在并发环境下的用处很小,因为它们的返回值总在不断变化。因此,这些操作的需求被弱化了,以换取对其他更重要操作的性能优化,包括get、put、containsKey和remove等。
      锁分段的一个劣势在于:与采用单个锁来实现独占访问相比,要获取多个锁来实现独占访问将更加困难并且开销更高。通常,在执行一个操作时最多只需获取一个锁,但在某些情况下需要加锁整个容器,例如当ConcurrentHashMap需要扩展映射范围,以及重新计算键值的散列值要分布到更大的桶集合中时,就需要获取分段锁集合中的所有锁。

    1. 实现原理

      ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

    它把区间按照并发级别(concurrentLevel),分成了若干个segment。默认情况下内部按并发级别为16来创建。对于每个segment的容量,默认情况也是16。当然并发级别(concurrentLevel)和每个段(segment)的初始容量都是可以通过构造函数设定的。

    2. 源码解读

      ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(充当锁的角色,每个 Segment 对象守护整个散列表的某一个段的若干个桶),HashEntry(键值节点),对应上面的图可以看出之间的关系。

    HashEntry 类

      在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

     static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         final HashEntry<K,V> next;  
     } 
    

      在 ConcurrentHashMap 中,在散列时如果产生“冲突”,将采用“链地址法”来处理“冲突”:把“冲突”的 HashEntry 对象链接成一个单向链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。 下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:

    注意:由于只能在表头插入,所以链表中节点的顺序和插入的顺序相反。其实,哪怕next不是final的,也应该从表头插入,因为从表尾插入的话,首先需要遍历到表尾,然后才能插入节点,复杂度O(n)。

    Segment 类

      Segment 类继承于 ReentrantLock 类,从而每个segment都可以当做一个锁。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。
      table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列表的一个桶。

    static final class Segment<K,V> extends ReentrantLock implements Serializable { 
           /** 
            * 在本 segment 范围内,包含的 HashEntry 元素的个数
            * 该变量被声明为 volatile 型
            */ 
           transient volatile int count; 
     
           /** 
            * table 被更新的次数
            */ 
           transient int modCount; 
     
           /** 
            * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
            */ 
           transient int threshold; 
     
           /** 
            * table 是由 HashEntry 对象组成的数组
            */ 
           transient volatile HashEntry<K,V>[] table; 
     
           final float loadFactor; 
     
           Segment(int initialCapacity, float lf) { 
               loadFactor = lf; 
               setTable(HashEntry.<K,V>newArray(initialCapacity)); 
           } 
     
           void setTable(HashEntry<K,V>[] newTable) { 
               threshold = (int)(newTable.length * loadFactor); 
               table = newTable; 
           } 
     
           /** 
            * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
            */ 
           HashEntry<K,V> getFirst(int hash) { 
               HashEntry<K,V>[] tab = table; 
               return tab[hash & (tab.length - 1)]; 
           } 
    }
    

    ConcurrentHashMap 类

    public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { 
     
       /** 
        * 散列表的默认初始容量为 16,即初始默认为 16 段
        */ 
       static final int DEFAULT_INITIAL_CAPACITY= 16; 
       static final float DEFAULT_LOAD_FACTOR= 0.75f; 
     
       /** 
        * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数,在构造函数中没有指定这个参数时,使用本参数
        */ 
       static final int DEFAULT_CONCURRENCY_LEVEL= 16; 
     
       /** 
        * segments 的掩码值
        * key 的散列码的高位用来选择具体的 segment 
        */ 
       final int segmentMask; 
     
       /** 
        * 偏移量
        */ 
       final int segmentShift; 
     
       /** 
        * 由 Segment 对象组成的数组
        */ 
       final Segment<K,V>[] segments; 
     
       /** 
        * 创建一个带有指定初始容量、负载因子和并发级别的新的空散列表
        */ 
       public ConcurrentHashMap(int initialCapacity,  float loadFactor, int concurrencyLevel) { 
           if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 
               throw new IllegalArgumentException(); 
     
           if(concurrencyLevel > MAX_SEGMENTS) 
               concurrencyLevel = MAX_SEGMENTS; 
     
           // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂) 
           int sshift = 0; 
           int ssize = 1; 
           while(ssize < concurrencyLevel) { 
               ++sshift; 
               ssize <<= 1; 
           } 
           segmentShift = 32 - sshift;       // 偏移量值
           segmentMask = ssize - 1;           // 掩码值 
           this.segments = Segment.newArray(ssize);   // 创建数组
     
           if (initialCapacity > MAXIMUM_CAPACITY) 
               initialCapacity = MAXIMUM_CAPACITY; 
           int c = initialCapacity / ssize; 
           if(c * ssize < initialCapacity) 
               ++c; 
           int cap = 1; 
           while(cap < c) 
               cap <<= 1; 
     
           // 依次遍历每个数组元素
           for(int i = 0; i < this.segments.length; ++i) 
               // 初始化每个数组元素引用的 Segment 对象
              this.segments[i] = new Segment<K,V>(cap, loadFactor); 
       } 
     
       /** 
        * 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 的空散列表。
        */ 
       public ConcurrentHashMap() { 
           this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); 
    }
    

    put方法

      首先,根据 key 计算出对应的 hash 值:

    public V put(K key, V value) { 
           if (value == null)          //ConcurrentHashMap 中不允许用 null 作为映射值
               throw new NullPointerException(); 
           int hash = hash(key.hashCode());        // 计算键对应的散列码
           // 根据散列码找到对应的 Segment 
           return segmentFor(hash).put(key, hash, value, false); 
    }
    

      然后,根据 hash 值找到对应的Segment 对象:

    /** 
        * 使用 key 的散列码来得到 segments 数组中对应的 Segment 
        */ 
    final Segment<K,V> segmentFor(int hash) { 
       // 将散列值右移 segmentShift 个位,并在高位填充 0 ,然后把得到的值与 segmentMask 相“与”,从而得到 hash 值对应的 segments 数组的下标值,最后根据下标值返回散列码对应的 Segment 对象
           return segments[(hash >>> segmentShift) & segmentMask]; 
    }
    

      最后,在这个 Segment 中执行具体的 put 操作:

        V put(K key, int hash, V value, boolean onlyIfAbsent) { 
               lock();  // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap 
               try { 
                   int c = count; 
     
                   if (c++ > threshold)     // 如果超过再散列的阈值
                       rehash();              // 执行再散列,table 数组的长度将扩充一倍
     
                   HashEntry<K,V>[] tab = table; 
                   int index = hash & (tab.length - 1); 
                   // 找到散列值对应的具体的那个桶
                   HashEntry<K,V> first = tab[index]; 
     
                   HashEntry<K,V> e = first; 
                   while (e != null && (e.hash != hash || !key.equals(e.key))) 
                       e = e.next; 
     
                   V oldValue; 
                   if (e != null) {            // 如果键值对已经存在
                       oldValue = e.value; 
                       if (!onlyIfAbsent) 
                           e.value = value;    // 替换 value 值
                   } 
                   else {                        // 键值对不存在 
                       oldValue = null; 
                       ++modCount;         // 要添加新节点到链表中,所以 modCont 要加 1  
                       // 创建新节点,并添加到链表的头部 
                       tab[index] = new HashEntry<K,V>(key, hash, first, value); 
                       count = c;               // 写 count 变量
                   } 
                   return oldValue; 
               } finally { 
                   unlock();                     // 解锁
               } 
           }
    

      注意:这里的加锁操作是针对某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。因为插入键 / 值对操作只是在这个 Segment 包含的某个桶中完成,不需要锁定整个ConcurrentHashMap。此时,其他写线程对另外 15 个Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。
      相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。

    get方法

    public V get(Object key) {
        int hash = hash(key); // throws NullPointerException if key null
        return segmentFor(hash).get(key, hash);
    }
    
    V get(Object key, int hash) { 
               if(count != 0) {       // 首先读 count 变量
                   HashEntry<K,V> e = getFirst(hash); 
                   while(e != null) { 
                       if(e.hash == hash && key.equals(e.key)) { 
                           V v = e.value; 
                           if(v != null)            
                               return v; 
                           // 如果读到 value 域为 null,说明发生了重排序,加锁后重新读取
                           return readValueUnderLock(e); 
                       } 
                       e = e.next; 
                   } 
               } 
               return null; 
           }
    

      ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。关键是用 HashEntry 对象的不变性来降低读操作对加锁的需求。只是判断获取的entry的value是否为null,为null时才使用加锁的方式再次去获取
      在代码清单“HashEntry 类的定义”中我们可以看到,HashEntry 中的 key,hash,next 都声明为 final 型。这意味着,不能把节点添加到链接的中间和尾部,也不能在链接的中间和尾部删除节点。这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变。这个特性可以大大降低处理链表时的复杂性。
    下面分析在get的时候的线程安全性

    1、如果get的过程中另一个线程恰好新增entry


      HashEntry 类的 value 域被声明为 volatile 型,Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。在 ConcurrentHashMap 中,不允许用 null 作为键和值,当读线程读到某个 HashEntry 的 value 域的值为 null 时,便知道发生了指令重排序现象(注意:volatile变量重排序规则,同时也是先行发生原则的一部分:对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的“后面”同样是指时间上的先后顺序。所以,在tab[index] = new HashEntry<K,V>(key, hash, first, value);中,可能会出现当前线程得到的newEntry对象是一个没有完全构造好的对象引用。),需要加锁后重新读入这个 value 值。

    2、如果get的过程中另一个线程修改了一个entry的value

      由于对 volatile 变量的可见性,写线程对链表的非结构性修改能够被后续不加锁的读线程“看到”。

    3、如果get的过程中另一个线程删除了一个entry

      假设我们的链表元素是:e1-> e2 -> e3 -> e4 我们要删除 e3这个entry
      因为HashEntry中next的不可变,所以我们无法直接把e2的next指向e4,而是将要删除的节点之前的节点复制一份,形成新的链表。它的实现大致如下图所示:

      注意:最后才将数组中对应桶位置的链表替换为新链表(也就是在最后一步替换之前,tab[i]指向的始终是删除之前的链表,详细看下面的remove方法)。
      如果我们get的也恰巧是e3,可能我们顺着链表刚找到e1,这时另一个线程就执行了删除e3的操作,而我们线程还会继续沿着旧的链表找到e3返回,这时候可能看到被删除的数据,但是在高并发环境下,这种影响是很小的。

    remove方法

        V remove(Object key, int hash, Object value) { 
               lock();         // 加锁
               try{ 
                   int c = count - 1; 
                   HashEntry<K,V>[] tab = table; 
                   int index = hash & (tab.length - 1); 
                   HashEntry<K,V> first = tab[index]; 
                   HashEntry<K,V> e = first; 
                   while(e != null&& (e.hash != hash || !key.equals(e.key))) 
                       e = e.next; 
     
                   V oldValue = null; 
                   if(e != null) { 
                       V v = e.value; 
                       if(value == null|| value.equals(v)) { // 找到要删除的节点
                           oldValue = v; 
                           ++modCount; 
                           // 所有处于待删除节点之后的节点原样保留在链表中
                           // 所有处于待删除节点之前的节点被克隆(其实是把所有值取出来放到一个新的HashEntry对象中)到新链表中
                           HashEntry<K,V> newFirst = e.next;// 待删节点的后继结点
                           for(HashEntry<K,V> p = first; p != e; p = p.next) 
                               newFirst = new HashEntry<K,V>(p.key, p.hash,  newFirst, p.value); 
    
                           // 新的头结点是原链表中,删除节点之前的那个节点
                           tab[index] = newFirst; 
                           count = c;      // 写 count 变量
                       } 
                   } 
                   return oldValue; 
               } finally{ 
                   unlock();               // 解锁
               } 
           }
    

      和 get 操作一样,首先根据散列码找到具体的链表;然后遍历这个链表找到要删除的节点;最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆(其实是把所有值取出来放到一个新的HashEntry对象中)到新链表中;最后才将数组中对应桶位置的链表替换为新链表(也就是在替换之前,get的始终是删除之前的链表)。
      下面通过图例来说明 remove 操作。假设写线程执行 remove 操作,要删除链表的 C 节点,另一个读线程同时正在遍历这个链表。

    执行删除之后的新链表

    B中的next定义为final,无法修改将它指向D,因此C之前的所有节点都要重新建立。并且它们在新链表中的链接顺序被反转了。

    size方法

      在 ConcurrentHashMap中,每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的个数。如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加的过程中count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment锁定。
      因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
      那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

    二、比较

      每个版本的ConcurrentHashMap几乎都有改动,本文说明的JDK6的源码。JDK8相比前面的版本改动最大,下面简要说一下:

    1. 它摒弃了Segment(段锁)的概念,而是启用了一种全新的方式实现,利用CAS算法。
    2. 它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

    详细看: http://blog.csdn.net/u010723709/article/details/48007881

    与Hashtable比较

      由于Hashtable无论是读还是写还是遍历,都需要获得对象锁,串行操作,因此在多线程环境下性能比较差。
      但是ConcurrentHashMap不能完全取代Hashtable:HashTable的迭代器是强一致性的,而ConcurrentHashMap是弱一致的。其实 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。 Doug Lea 也将这个判断留给用户自己决定是否使用ConcurrentHashMap。
      弱一致性:不保证数据完全处于一致性状态。比如:

    get方法:

    可能在get的时候获得一个还没完全构造好的HashEntry对象,导致获得的entry的value为null,此时需要加锁重新读取。

    clear方法

    public void clear() {
    	for (int i = 0; i < segments.length; ++i)
    		segments[i].clear();
    }
    

      因为没有全局的锁,在清除完一个segments之后,正在清理下一个segments的时候,已经清理segments可能又被加入了数据,因此clear返回的时候,ConcurrentHashMap中是可能存在数据的。因此,clear方法是弱一致的。

    迭代器

       java.util 包中的集合类都返回 fail-fast 迭代器,这意味着它们假设线程在集合内容中进行迭代时,集合不会更改它的内容。如果 fail-fast 迭代器检测到在迭代过程中进行了更改操作,那么它会抛出 ConcurrentModificationException。
       ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法。它们大同小异,这里选择entrySet解释。当我们调用entrySet返回值的iterator方法时,返回的是EntryIterator,在EntryIterator上调用next方法时,最终实际调用到了HashIterator.advance()方法,看下这个方法:

    final void advance() {
        if (nextEntry != null && (nextEntry = nextEntry.next) != null)
            return;
     
        while (nextTableIndex >= 0) {
            if ( (nextEntry = currentTable[nextTableIndex--]) != null)
                return;
        }
     
        while (nextSegmentIndex >= 0) {
            Segment<K,V> seg = segments[nextSegmentIndex--];
            if (seg.count != 0) {
                currentTable = seg.table;
                for (int j = currentTable.length - 1; j >= 0; --j) {
                    if ( (nextEntry = currentTable[j]) != null) {
                        nextTableIndex = j - 1;
                        return;
                    }
                }
            }
        }
    }
    

      在这种迭代方式中,比如我们删除了链表的某个entry,但是在完成之前,迭代器获得了旧的链表指针,那么就会遍历旧的链表,并且不会报异常。

  • 相关阅读:
    MySQL锁机制和主从复制
    MySQL索引优化
    MySQL的Explain
    事务并发问题和隔离级别
    解决angular单页面页面底部跳转到新页面滚动条不在顶部的问题
    关于字符串在ie浏览器拼接问题
    angular 新建命令
    angular 多路由模块新建组件的方法
    Angular 4.x NgClass ngStyle 指令用法
    slideDown()、slideUp()反复执行的问题
  • 原文地址:https://www.cnblogs.com/xzwblog/p/7411772.html
Copyright © 2011-2022 走看看