zoukankan      html  css  js  c++  java
  • ConcurrentHashMap原理分析

    org.jboss.netty.util.internal.ConcurrentHashMap

    通过分析Hashtable就知道,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

    主要实体类

    ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系。ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点

    /** 
    * The segments, each of which is a specialized hash table 
    */  
    final Segment<K,V>[] segments; 
    
    static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         final HashEntry<K,V> next;  
    }  

    可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
    其它为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。

    final Segment<K,V> segmentFor(int hash) {  
         return segments[(hash >>> segmentShift) & segmentMask];  
    } 

    ConcurrentHashMap的数据成员

    public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>  
            implements ConcurrentMap<K, V>, Serializable {  
        final int segmentMask;  
        final int segmentShift;   
        final Segment<K,V>[] segments; 

    所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。每个Segment相当于一个子Hash表

     static final class Segment<K,V> extends ReentrantLock implements Serializable {  
        transient volatile int count;  
        transient int modCount;  
        transient int threshold;  
        transient volatile HashEntry<K,V>[] table;  
        final float loadFactor;  
     } 

    count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变。threashold用来表示需要进行rehash的界限值table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的 table值而不需要同步。loadFactor表示负载因子

    删除操作

    public V remove(Object key) {  
     hash = hash(key.hashCode());  
        return segmentFor(hash).remove(key, hash, null);  
    }  
    //操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现:
    V remove(Object key, int hash, Object value) {  
        lock();  
        try {  
            int c = count - 1;  
            HashEntry<K,V>[] tab = table;  
            int index = hash & (tab.length - 1);  
            HashEntry<K,V> first = tab[index];  
            HashEntry<K,V> e = first;  
            while (e != null && (e.hash != hash || !key.equals(e.key)))  
                 e = e.next;  
       
             V oldValue = null;  
             if (e != null) {  
                 V v = e.value;  
                 if (value == null || value.equals(v)) {  
                     oldValue = v;  
                     ++modCount;  
                     HashEntry<K,V> newFirst = e.next;  
                     *for (HashEntry<K,V> p = first; p != e; p = p.next)  
                         *newFirst = new HashEntry<K,V>(p.key, p.hash,  
                                                       newFirst, p.value);  
                     tab[index] = newFirst;  
                     count = c; // write-volatile  
                 }  
             }  
             return oldValue;  
         } finally {  
             unlock();  
         }  
    } 

    整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。中间那个for循环是做什么用的呢?(*号标记)从代码来看,就是将定位之后的所有entry克隆并拼回前面去,但有必要吗?每次删除一个元素就要将那之前的元素克隆一遍?这点其实是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次。至于entry为什么要设置为不变性,这跟不变性的访问不需要同步从而节省时间有关。

    整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

    插入操作

    V put(K key, int hash, V value, boolean onlyIfAbsent) {  
        lock();  
        try {  
            int c = count;  
            if (c++ > threshold) // ensure capacity  
                rehash();  
            HashEntry<K,V>[] tab = table;  
            int index = hash & (tab.length - 1);  
            HashEntry<K,V> first = tab[index];  
             HashEntry<K,V> e = first;  
             while (e != null && (e.hash != hash || !key.equals(e.key)))  
                 e = e.next;  
       
             V oldValue;  
             if (e != null) {  
                 oldValue = e.value;  
                 if (!onlyIfAbsent)  
                     e.value = value;  
             }  
             else {  
                 oldValue = null;  
                 ++modCount;  
                 tab[index] = new HashEntry<K,V>(key, hash, first, value);  
                 count = c; // write-volatile  
             }  
             return oldValue;  
         } finally {  
             unlock();  
         }  
    } 

    该方法也是在持有段锁(锁定整个segment)的情况下执行的,这当然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时能够rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n。而比较难懂的是这句int index = hash & (tab.length - 1),原来segment里面才是真正的hashtable,即每个segment是一个传统意义上的hashtable,这里就是找出需要的entry在table的哪一个位置,之后得到的entry就是这个链的第一个节点,如果e!=null,说明找到了,这是就要替换节点的值(onlyIfAbsent == false),否则,我们需要new一个entry,它的后继是first,而让tab[index]指向它,什么意思呢?实际上就是将这个新entry插入到链头。

    获取操作

    V get(Object key, int hash) {  
        if (count != 0) { // read-volatile 当前桶的数据个数是否为0 
            HashEntry<K,V> e = getFirst(hash);  得到头节点
            while (e != null) {  
                if (e.hash == hash && key.equals(e.key)) {  
                    V v = e.value;  
                    if (v != null)  
                        return v;  
                    return readValueUnderLock(e); // recheck  
                 }  
                 e = e.next;  
             }  
         }  
         return null;  
    } 

    V readValueUnderLock(HashEntry<K,V> e) {
      lock();
      try {
        return e.value;
      } finally {
        unlock();
      }
    }

     

    get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count 变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是 volatile的,也能保证读取到最新的值。接下来就是根据hash和key对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。

    注意点:

    对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在 table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

    如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为 put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为 HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空(在还没执行到this.value=value时,tab[index]已经被赋值并通过e.value使用了),这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值。

    HashEntry(K key, int hash, ConcurrentHashMap.HashEntry<K, V> next, V value) {
        this.hash = hash;
        this.next = next;
        this.key = key;
        this.value = value;
    }

    java.util.concurrent.ConcurrentHashMap

    我们首先来看一下ConcurrentHashMap类的声明:

        public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
            implements ConcurrentMap<K, V>, Serializable


    其中,这个类继承了java.util.AbstractMap中已有的实现,这个在前面整理HashMap的时候已经提过了,重点看后面实现的接口ConcurrentMap和Serializable。Serializable是做序列化处理的,而ConcurrentMap的定义又如下:
    public interface ConcurrentMap<K, V> extends Map<K, V> {
    
        V putIfAbsent(K key, V value);
    
        boolean remove(Object key, Object value);
    
        boolean replace(K key, V oldValue, V newValue);
    
        V replace(K key, V value);
    }

    其中规定了4个方法 

    V putIfAbsent(K key, V value); 如果没有这个key,则放入这个key-value,返回null,否则返回key对应的value。

    boolean remove(Object key, Object value); 移除key和对应的value,如果key对应的不是value,移除失败

    boolean replace(K key, V oldValue, V newValue); 替代key对应的值,仅当当前值为旧值

    V replace(K key, V value); 替代key对应的值,只要当前有值

    这些方法都在ConcurrentHashMap实现了,后文会部分提到这些。

    1. 构造方法和ConcurrentHashMap的Segment实现 
    先看下构造方法:

     public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel)

    有几个重载的方法,说这个参数最全的,有3个参数,除了HashMap中涉及到的loadFactor和initialCapacity外,还有一个concurrencyLevel,翻译过来就是并发级别或者并发度。

    与此对应,ConcurrentHashMap中有一个segments数组对象,元素类型是ConcurrentHashMap的内部类Segment,而concurrencyLevel就是这个segments数组的大小。

    我们来看下这个Segment类:

    static final class Segment<K,V> extends ReentrantLock implements Serializable

    Segment扩展了ReentrantLock并实现了Serializable接口。除此之外,我们还发现这个类里实现的东西和java.util.HashMap非常相似。

    实际上,这个类正是整个ConcurrentHashMap实现的关键。我想,作为这篇文章读者的您,应该会用到过各式各样的数据库,就拿masql的innoDB引擎来看,它除了支持表级锁意外,还支持行级锁,意义就在于这减小了锁粒度,当只对某行数据进行操作的时候,很可能没有必要限制同一个表中其它行的数据。在这个类中,这个Segment也是起到了同样的作用。每个Segment本身就是一个ReentrantLock,只有要修改的数据存在在同一个Segment,才有可能会需要锁定,这样就提高了多线程情况下效率,没必要所有线程全部等待锁。

    2. get()方法源码分析 
    我们先看下get()方法的实现。

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

    实际上,就是通过key计算得到的hash值,确定对应的Segment对象,并用原子操作获取到对应的table和table中hash值对应的对象。

    我们可以看到,在这个过程中,是没有显式用到锁的,仅仅是通过Unsafe类和原子操作,避免了阻塞,提高了性能。

    3. put()和putIfAbsent()方法分析 
    先看下put()方法的源码:

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

    我们看到其中最后是使用Segment的put()方法的调用,而putIfAbsent()的方法的调用,仅仅是最后一个参数不同。

    我们进一步看下Segment的put()方法的调用:

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

    和java.util.HashMap比起来,基本类似,有两个地方不同。一个是对onlyIfAbsent参数的判断处理。另一个则是对整个操作过程加锁,并在加锁的地方做了稍微巧妙的处理。就是在在等锁的过程中,不断的寻找和构建node节点对象,不管是否这个方法最终是否创建到了node节点,当这个方法返回时,一定是已经获得了这个Segment对应的锁。

    而这个寻找和创建节点所在的循环,一方面是做节点的遍历查询,另一方面也是起到了自旋锁的作用,避免直接调用lock()而等锁阻塞,因为这样会在系统实现层面阻塞和唤醒线程,是有一定的切换成本的,对提高效率不利。

    4. 综述 
    其实,纵观整个类的实现,都肯定会涉及到Segment的处理和其中方法的调用。对这些方法的调用,分为读操作和写处理,通常读操作没用用锁,而在修改操作,如remove() replace()等方法都和put()类似,在整个操作过程中对Segment进行了尝试加锁和自旋等锁的前提操作,并最后释放锁。这些写操作的等锁基本上和put()中的scanAndLockForPut()方法等同,除了需要创建节点。

    从整体上看,由于Segment降低了并发中的锁粒度,并在写操作使用了锁。保证了整个ConcurrentHashMap的线程安全,也保证了并发运行时的效率。

    5. 其它 
    在java.util.conccurent包中并没有TreeMap,但对于有序要求的容器,有基于skip list数据结构的Map实现ConcurrentSkipListMap,而且也有skip list的ConcurrentSkipListSet,和java.util包中的Set一样,是基于Map的。

  • 相关阅读:
    直播源列表
    MySQL为什么"错误"选择代价更大的索引
    C#中ConfigureAwait的理解(作者Stephen)
    理解C#中的 async await
    C#中Task.Delay() 和 Thread.Sleep() 区别
    扁平结构数据变成嵌套结构数据(树状结构)
    判断两个数组相同 两个对象相同 js
    嵌套结构数据(树状结构)变成扁平结构不带子元素(children)
    嵌套结构数据(树状结构)变成扁平结构带有子元素(children)
    2022.1.11学习日志
  • 原文地址:https://www.cnblogs.com/wade-luffy/p/5765138.html
Copyright © 2011-2022 走看看