zoukankan      html  css  js  c++  java
  • ConcurrentHashMap实现解析

    ConcurrentHashMap是线程安全的HashMap的实现,具有更加高效的并发性。与HashTable不同,ConcurrentHashMap运用锁分离技术,尽量减小写操作时加锁的粒度,即在写操作时,不用对整个ConcurrentHashMap加锁。为了实现,ConcurrentHashMap采用了Segment结构,每个Segment中维护了一个链表数组,在存取操作过程中实现两次哈希。在写数据的过程中,对每个Segment加锁,这样如果操作的数据位于两个不同的Segment中,便可并发进行,大大提高了并发的效率。

    HashTable和ConcurrentHashMap在内部结构上的区别:

    HashTable:                                                                                                  ConcurrentHashMap:

     

    左边便是Hashtable的实现方式---整个Hash表加锁;而右边则是ConcurrentHashMap的实现方式---分段。ConcurrentHashMap默认将hash表分为16个段,诸如get,put,remove等常用操作只锁当前需要用到的段。这样,原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的。以下代码是基于jdk1.5,在jdk1.7中,put操作用了自旋锁的机制,理解起来费劲。

    1.segment的数据结构:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        //Segment中元素的数量
        transient volatile int count;
        //对table的大小造成影响的操作的次数
        transient int modCount;
        //阈值,Segment里面元素的数量超过这      个值依旧就会对Segment进行扩容
        transient int threshold;
        //链表数组,每个segment维持一个数组
        transient volatile HashEntry<K,V>[] table;
        //负载因子
        final float loadFactor;
    }

    2.每个Entry(HashEntry)的结构:

    static final class HashEntry<K,V> {
        //key-value对的key值
        final K key;
        final int hash;
        //key-value对的value值
        volatile V value;
        //链表指向下一个Entry的引用
        final HashEntry<K,V> next;
    }

    3.ConcurrentHashMap的初始化

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
     
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
     
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);
     
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;
     
        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

    有三个参数:
    initialCapacity:表示初始的容量;
    loadFactor:表示负载因子参数;
    concurrentLevel:表示ConcurrentHashMap内部的Segment的数量;
    ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样做扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash。Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash(通过按位与的哈希算法来定位segments数组的索引),加快hash的过程;根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数(同理),同样使为了加快hash的过程。segmentShift和segmentMask,这两个变量在定位segment时的哈希算法里需要使用,很重要。

    4.segment内部的put操作:

    //返回的是原来已有的和key相同的HashEntry的value值
    V put(K key, int hash, V value, boolean onlyIfAbsent){
    	//加锁
    	lock();
    	try {
    			 //当前segment中HashEntry的数量
    	                int c = count;
    			 //需要进行扩容,rehash
    	                if (c++ > threshold) // ensure capacity
    	                    rehash();
    	                HashEntry<K,V>[] tab = table;
    	                int index = hash & (tab.length - 1);//定位HashEntry,即在HashEntry Table中的下标
    	                HashEntry<K,V> first = tab[index];
    	                HashEntry<K,V> e = first;
    			 //找到所在链表中key值和要加入的key值相同的HashEntry
    	                while (e != null && (e.hash != hash || !key.equals(e.key)))
    	                    e = e.next;
    					
    	                V oldValue;
    	                if (e != null) {//找到更新value值即可
    	                    oldValue = e.value;
    	                    if (!onlyIfAbsent)
    	                        e.value = value;
    	                }
    	                else {//未找到,e为null,则新生成一个HashEntry,并将原来的链作为自己的next
    	                    oldValue = null;
    	                    ++modCount;
    	                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
    	                    count = c; // write-volatile
    	                }
    	                return oldValue;
    	            } finally {
    	                unlock();//释放锁
    	            }
    	        }

    4.segment内部的put操作,如上述,不用加锁:

            V get(Object key, int hash) {
                if (count != 0) { // read-volatile
                    HashEntry<K,V> e = getFirst(hash);
                    while (e != null) {
                        if (e.hash == hash && key.equals(e.key)) {
                            V v = e.value;
                            if (v != null)
                                return v;
                            return readValueUnderLock(e); // recheck
                        }
                        e = e.next;
                    }
                }
                return null;
            }
            V readValueUnderLock(HashEntry<K,V> e) {
                lock();
                try {
                    return e.value;
                } finally {
                    unlock();
                }
            }
    注释://recheck,可能有点费解,v怎么可能会是null呢?在put操作时(不是segment内部的操作,而是整个Hash表的put操作中会判断如果value值为null会抛出异常),空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。这种情况应当很罕见,一旦发生这种情况,ConcurrentHashMap采取的方式是在持有锁的情况下再读一遍,这能够保证读到最新的值,并且一定不会为空值。(引)

    5.remove操作:

    删除操作是加锁的,有多个删除操作同时进行,只要删除的对象不在同一段内,则可以并发执行,大大提高了并发的效率。整个ConcurrentHashMap操作也是借助于在segment上的操作,先将待删除的HashEntry定位到相应的segment,在segment上做删除操作。

            V remove(Object key, int hash, Object value) {
                lock();
                try {
                    int c = count - 1;
                    HashEntry<K,V>[] tab = table;
                    int index = hash & (tab.length - 1);
                    HashEntry<K,V> first = tab[index];
                    HashEntry<K,V> e = first;
                    while (e != null && (e.hash != hash || !key.equals(e.key)))
                        e = e.next;
    <span style="white-space:pre">		</span>V oldValue = null;
                    if (e != null) {
                        V v = e.value;
                        if (value == null || value.equals(v)) {
                            oldValue = v;
                            // All entries following removed node can stay
                            // in list, but all preceding ones need to be
                            // cloned.
                            ++modCount;
                            HashEntry<K,V> newFirst = e.next;
                            for (HashEntry<K,V> p = first; p != e; p = p.next)
                                newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                              newFirst, p.value);
                            tab[index] = newFirst;
                            count = c; // write-volatile
                        }
                    }
                    return oldValue;
                } finally {
                    unlock();
                }
            }
    首先找到待删除的节点,如果不存在这个节点就直接返回null,否则就要将待删除节点(节点e)前面的结点复制一遍,尾结点指向e的下一个结点。将e后面的结点复制,可以重复使用。当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。删除之后,e前面的元素的顺序会发生改变:

    6.size()操作:

    用于统计ConcurrentHashMap中元素的个数,是跨段操作的。首先在没有加锁的情况下,遍历所有的segment,看得到的所有段的count和和modCount和相同与否,重复计算比较RETRIES_BEFORE_LOCK次,如果相同则代表在统计过程中没有发生remove或put操作,直接返回。如果不相同,则把这个过程再重复做一次。若还不相同,则就需要将所有的Segment都加锁,然后遍历。

        public int size() {
            final Segment<K,V>[] segments = this.segments;
            long sum = 0;
            long check = 0;
            int[] mc = new int[segments.length];
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
                check = 0;
                sum = 0;
                int mcsum = 0;
                for (int i = 0; i < segments.length; ++i) {
                    sum += segments[i].count;
                    mcsum += mc[i] = segments[i].modCount;
                }
                if (mcsum != 0) {
                    for (int i = 0; i < segments.length; ++i) {
                        check += segments[i].count;
                        if (mc[i] != segments[i].modCount) {
                            check = -1; // force retry
                            break;
                        }
                    }
                }
                if (check == sum)
                    break;
            }
            if (check != sum) { // Resort to locking all segments
                sum = 0;
                for (int i = 0; i < segments.length; ++i)
                    segments[i].lock();
                for (int i = 0; i < segments.length; ++i)
                    sum += segments[i].count;
                for (int i = 0; i < segments.length; ++i)
                    segments[i].unlock();
            }
            if (sum > Integer.MAX_VALUE)
                return Integer.MAX_VALUE;
            else
                return (int)sum;
        }
    总结:

    ConcurrentHashMap利用了锁分离技术实现了更高性能的并发,实现方式很精妙。关于ConcurrentHashMap的更多内容还要继续学习。










  • 相关阅读:
    Python装饰器
    Python导模块问题
    selenium定位元素提示‘元素不可见’问题解决方法
    Python导入模块Import和from+Import区别
    关于iframe切换的问题
    Python+selenium 模拟wap端页面操作
    使用Pytesseract+TesseractOCR识别图片的简单步骤
    通过cookie绕过验证码登录
    oo第三次作业——项目的问题与反思
    Java_第二次作业:项目构思与实现
  • 原文地址:https://www.cnblogs.com/sunp823/p/5601413.html
Copyright © 2011-2022 走看看