zoukankan      html  css  js  c++  java
  • 并发容器Map之一:ConcurrentHashMap原理(jdk1.7)

    一、ConcurrentHashMap简介

      HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

    二、ConcurrentHashMap源码分析

    2.1、类图结构

    2.2、数据结构

    jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
    HashEntry类似HashMap,Segment中内部数组的每一项都是一个单项链节点,它包含了key、hash、value等信息:
    HashEntry.java

        static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;
    
            HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.value = value;
                this.next = next;
            }
    
            final void setNext(HashEntry<K,V> n) {
                UNSAFE.putOrderedObject(this, nextOffset, n);
            }
    
            // Unsafe mechanics
            static final sun.misc.Unsafe UNSAFE;
            static final long nextOffset;
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class k = HashEntry.class;
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

    这意味着不能从hash链的中间或尾部添加或删除节点,因为构造函数只有一个next参数。所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。
    在JDK 1.6中,HashEntry中的next指针也定义为final,并且每次插入将新添加节点作为链的头节点(同HashMap实现),而且每次删除一个节点时,会将删除节点之前的所有节点拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除以后有两条链存在,因而可以保证即使在同一条链中,有一个线程在删除,而另一个线程在遍历,它们都能工作良好,因为遍历的线程能继续使用原有的链。因而这种实现是一种更加细粒度的happens-before关系,即如果遍历线程在删除线程结束后开始,则它能看到删除后的变化,如果它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。如果这不符合你的需求,还是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()做的包装。
    另一个不同于1.6版本中的实现是它提供setNext()方法,而且这个方法调用了Unsafe类中的putOrderedObject()方法,该方法只对volatile字段有用,关于这个方法的解释如下:
    Sets the value of the object field at the specified offset in the supplied object to the given value. This is an ordered or lazy version of putObjectVolatile(Object,long,Object), which doesn't guarantee the immediate visibility of the change to other threads. It is only really useful where the object field is volatile, and is thus expected to change unexpectedly.
    我对这个函数的理解:对volatile字段,按规范,在每次向它写入值后,它更新后的值立即对其他线程可见(可以简单的认为对volatile字段,每次读取它的值时都直接从内存中读取,而不会读缓存中的数据,如CPU的缓存;对写入操作也是直接写入内存),而这个函数可以提供一种选择,即使对volatile字段的写操作,我们也可以使用该方法将它作为一种普通字段来对待。这里setNext()方法的存在是为了在remove时不需要做拷贝额外链进行的优化,具体可以参看remove操作。

    Segment类

    Segment类的实现是ConcurrentHashMap实现的核心。

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
            static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    
            transient volatile HashEntry<K,V>[] table;
    
            transient int count;
    
            transient int modCount;
    
            transient int threshold;
    
            final float loadFactor;
            Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
                this.loadFactor = lf;
                this.threshold = threshold;
                this.table = tab;
            }
    
            final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }

    在之前的JDK版本中,Segment的put操作开始时就会先加锁,直到put完成才解锁。在JDK 1.7中采用了自旋的机制,进一步减少了加锁的可能性。
    先不考虑自旋等待的问题,假如put一开始就拿到锁,那么它会执行以下逻辑:
    根据之前计算出来的hash值找到数组相应bucket中的第一个链节点。这里需要注意的是:
    a. 因为ConcurrentHashMap在计算Segment中数组长度时会保证该值是2的倍数,而且Segment在做rehash时也是每次增长一倍,因而数组索引只做"(tab.length - 1) & hash"计算即可。
    b. 因为table字段时一个volatile变量,因而在开始时将该引用赋值给tab变量,可以减少在直接引用table字段时,因为该字段是volatile而不能做优化带来的损失,因为将table引用赋值给局部变量后就可以把它左右普通变量以实现编译、运行时的优化。
    c. 因为之前已经将volatile的table字段引用赋值给tab局部变量了,为了保证每次读取的table中的数组项都是最新的值,因而调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取(在put操作更新节点链时,它采用Unsafe.putOrderedObject()操作,此时它对链头的更新只局限于当前线程,为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头)。
    遍历数组项中的节点链,如果在节点中能找到key相等的节点,并且当前是put()操作而不是putIfAbsent()操作,纪录原来的值,更新该节点的值,并退出循环,put()操作完成。
    如果在节点链中没有找到key相等的节点,创建一个新的节点,并将该节点作为当前链头插入当前链,并将count加1。和读取节点链连头想法,这里使用setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存[参考Java的内存模型,注1]。然后当节点数操作阀值(capacity*loadFactor),而数组长度没有达到最大数组长度,会做rehash。另外,如果scanAndLockForPut()操作返回了一个非空HashEntry,则表示在scanAndLockForPut()遍历key对应节点链时没有找到相应的节点,此时很多时候需要创建新的节点,因而它预创建HashEntry节点(预创建时因为有些时候它确实不需要再创建),所以不需要再创建,只需要更新它的next指针即可,这里使用setNext()实现延时写也时为了提升性能,因为当前修改并不需要让其他线程知道,在锁退出时修改自然会更新到内存中,如果采用直接赋值给next字段,由于next时volatile字段,会引起更新直接写入内存而增加开销。

    Segment中的scanAndLockForPut操作

            private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);  //节点链的头
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // 查找key对应的节点链中是已存在该节点,如果没有找到已存在的节点,则预创建一个新节点
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        //如果桶条目不为空,那么遍历桶中条目链表
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;    //根据节点链头持续遍历该链(如果节点链中不存在要插入的节点,则预创建一个节点,上面的创建node)
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {//并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,即所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64;
                        lock();//阻塞
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {//当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }
            
        static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
            HashEntry<K,V>[] tab;
            return (seg == null || (tab = seg.table) == null) ? null :
                (HashEntry<K,V>) UNSAFE.getObjectVolatile
                (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
        }

    如put源码所示,当put操作时,如果此时有其他线程也再对这个段进行更新操作,此线程尝试加锁不成功,此时此线程会执行scanAndLockForPut进行重试,它不是直接进入等待状态。在scanAndLockForPut()中,
    1、先根据hash值找到节点链的链头
    2、根据key从对应的节点链中是已存在该节点,查找时是从链头开始的;
    3、如果没有找到已存在的节点,则预创建一个新节点,然后尝试n次,直到尝试次数操作限制,才真正进入等待状态,计所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64;
    总的来说,put的同步机制是如果没有其他线程在更新该段,那么直接put。否则轮询请求锁,直至获得锁。

    Segment中的rehash操作
    rehash的逻辑比较简单,它创建一个大原来两倍容量的数组,然后遍历原来数组以及数组项中的每条链,对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组中,这里需要重新创建一个新节点而不是修改原有节点的next指针时为了在做rehash时可以保证其他线程的get遍历操作可以正常在原有的链上正常工作,有点copy-on-write思想。然而Doug Lea继续优化了这段逻辑,为了减少重新创建新节点的开销,这里做了两点优化:
    1,对只有一个节点的链,直接将该节点赋值给新数组对应项即可(之所以能这么做是因为Segment中数组的长度也永远是2的倍数,而将数组长度扩大成原来的2倍,那么新节点在新数组中的位置只能是相同的索引号或者原来索引号加原来数组的长度,因而可以保证每条链在rehash是不会相互干扰);
    2,对有多个节点的链,先遍历该链找到第一个后面所有节点的索引值不变的节点p,然后只重新创建节点p以前的节点即可,此时新节点链和旧节点链同时存在,在p节点相遇,这样即使有其他线程在当前链做遍历也能正常工作:

            private void rehash(HashEntry<K,V> node) {
    
                HashEntry<K,V>[] oldTable = table;
                int oldCapacity = oldTable.length;
                int newCapacity = oldCapacity << 1;//新数组长度是原数组的2倍
                threshold = (int)(newCapacity * loadFactor);
                HashEntry<K,V>[] newTable =
                    (HashEntry<K,V>[]) new HashEntry[newCapacity];
                int sizeMask = newCapacity - 1;
                for (int i = 0; i < oldCapacity ; i++) {
                    HashEntry<K,V> e = oldTable[i];
                    if (e != null) {
                        HashEntry<K,V> next = e.next;
                        int idx = e.hash & sizeMask;   //每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组中
                        if (next == null)   //对只有一个节点的链,直接将该节点赋值给新数组对应项即可
                            newTable[idx] = e;
                        else { // 对有多个节点的链
                            HashEntry<K,V> lastRun = e;
                            int lastIdx = idx;
                            for (HashEntry<K,V> last = next;
                                 last != null;
                                 last = last.next) {
                                int k = last.hash & sizeMask;
                                if (k != lastIdx) {
                                    lastIdx = k;
                                    lastRun = last;
                                }
                            }
                            newTable[lastIdx] = lastRun;
                            // Clone remaining nodes
                            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                                V v = p.value;
                                int h = p.hash;
                                int k = h & sizeMask;
                                HashEntry<K,V> n = newTable[k];
                                newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                            }
                        }
                    }
                }
                int nodeIndex = node.hash & sizeMask; // add the new node
                node.setNext(newTable[nodeIndex]);
                newTable[nodeIndex] = node;
                table = newTable;
            }
            

    Segment中的remove操作

            final V remove(Object key, int hash, Object value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                V oldValue = null;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> e = entryAt(tab, index);
                    HashEntry<K,V> pred = null;
                    while (e != null) {
                        K k;
                        HashEntry<K,V> next = e.next;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            V v = e.value;
                            if (value == null || value == v || value.equals(v)) {
                                if (pred == null)
                                    setEntryAt(tab, index, next);
                                else
                                    pred.setNext(next);
                                ++modCount;
                                --count;
                                oldValue = v;
                            }
                            break;
                        }
                        pred = e;
                        e = next;
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }

    在JDK 1.6版本中,remove操作比较直观,它先找到key对应的节点链的链头(数组中的某个项),然后遍历该节点链,如果在节点链中找到key相等的节点,则为该节点之前的所有节点重新创建节点并组成一条新链,将该新链的链尾指向找到节点的下一个节点。这样如前面rehash提到的,同时有两条链存在,即使有另一个线程正在该链上遍历也不会出问题。然而Doug Lea又挖掘到了新的优化点,在1.7中,他不再重新创建一条新的链,而是只在当起缓存中将链中找到的节点移除。当移除的是链头则更新数组项的值,否则更新找到节点的前一个节点的next指针。这也是HashEntry中next指针没有设置成final的原因。当然remove操作如果第一次尝试获得锁失败也会如put操作一样先进入自旋状态,这里的scanAndLock和scanAndLockForPut类似,只是它不做预创建节点的步骤,不再细说。
    Segment中的replace操作
    ConcurrentHashMap添加了replace接口,它和put的区别是put操作如果原Map中不存在key会将传入的键值对添加到Map中,而replace不会这么做,它只是简单的返回false。Segment中的replace操作先加锁或自旋等待,然后遍历相应的节点链,如果找到节点,则替换原有的值,返回true,否则返回false,比较简单,不细究。

            final boolean replace(K key, int hash, V oldValue, V newValue) {
                if (!tryLock())
                    scanAndLock(key, hash);
                boolean replaced = false;
                try {
                    HashEntry<K,V> e;
                    for (e = entryForHash(this, hash); e != null; e = e.next) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            if (oldValue.equals(e.value)) {
                                e.value = newValue;
                                ++modCount;
                                replaced = true;
                            }
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return replaced;
            }
    
            final V replace(K key, int hash, V value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                V oldValue = null;
                try {
                    HashEntry<K,V> e;
                    for (e = entryForHash(this, hash); e != null; e = e.next) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            e.value = value;
                            ++modCount;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }

    Segment中的clear操作
    Segment中的clear操作不同于其他操作,它直接请求加锁而没有自旋等待的步骤,这可能是因为它需要对整个table做操作,因而需要等到所有在table上的操作的线程退出才能执行,而不象其他操作只是对table中的一条链操作,对一条链操作的线程执行的比较快,因而自旋可以后获得锁的可能性比较大,对table操作的等待相对要比较久,因而自旋等待意义不大。clear操作只是将数组的每个项设置为null,它使用setEntryAt的延迟设置,从而保证其他读线程的正常工作。

            final void clear() {
                lock();
                try {
                    HashEntry<K,V>[] tab = table;
                    for (int i = 0; i < tab.length ; i++)
                        setEntryAt(tab, i, null);
                    ++modCount;
                    count = 0;
                } finally {
                    unlock();
                }
            }
        static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                           HashEntry<K,V> e) {
            UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
        }

    2.2、ConcurrentHashMap中的lock

      ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离(Lock Stripping)技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。
    有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁。
    ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。

    2.3、成员变量

    //map的容量大小
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    
    //因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    
    //桶(段)的数量默认值,默认并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    
    //最大容量,1左位移30位,及2的30次方=1073741824
    static final int MAXIMUM_CAPACITY = 1 << 30;
    
    //桶(段)的数量最小值
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    
    //桶(段)的数量最大值=65536
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
    
    //先采用不加锁的方式,连续计算元素的个数,最多计算3次(循环时从-1开始,所以为2时是3次)
    static final int RETRIES_BEFORE_LOCK = 2;
    
    private transient final int hashSeed = randomHashSeed(this);
    //段掩码
    final int segmentMask;
    
    //segmentShift桶(段)偏移量,这segmentMask和segmentShift桶对segment的定位比较相关
    final int segmentShift;
    
    /**
    * The segments, each of which is a specialized hash table.
    */
    final Segment<K,V>[] segments;
    
    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

    2.3、构造函数

        public ConcurrentHashMap() {
            this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
        }
        
        public ConcurrentHashMap(int initialCapacity) {
            this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
        }
        
        public ConcurrentHashMap(int initialCapacity, float loadFactor) {
            this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
        }
        
        //concurrencyLevel用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小
        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)//注意: static final int MAX_SEGMENTS = 1 << 16; // slightly conservative   这就是说concurrencyLevel最大值是65535,对应二进制是16位
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
            int sshift = 0;//表示偏移位数
            int ssize = 1;//表示segments数组的长度
            //从这里可以看出segments数组的长度是由concurrencyLevel决定的,由于要通过按位与的方法来定位segment的位置,
            //那么必须保证segments的大小是2的整数次幂(power-of -two size),concurrencyLevel默认是16,那么sshift默认就是4即左移的次数,ssize为16。默认有16个线程同时修改,最理想情况下每个线程对应一个segment这样就不冲突了。
            //如果指定concurrencyLevel为13,14,ssize都会是16,就是一定要保证大小是不小于concurrencyLevel的最小的2的整数次幂。
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;//左移4位=16,即sshift=4
            }
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            //initialCapacity是总容量,下面c=总容量/桶(段)数,后面的while循环是让每个桶内的HashEntry的数量cap为2的整数幂,最小值为2。
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)//不满的情况
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }

    hash函数:

        private int hash(Object k) {
            int h = hashSeed;
            if ((0 != h) && (k instanceof String)) {
                return sun.misc.Hashing.stringHash32((String) k);
            }
            h ^= k.hashCode();//首先先得到初始的hash
           //利用h进行再哈希
            // Spread bits to regularize both segment and index locations,
            // using variant of single-word Wang/Jenkins hash.
            h += (h <<  15) ^ 0xffffcd7d;
            h ^= (h >>> 10);
            h += (h <<   3);
            h ^= (h >>>  6);
            h += (h <<   2) + (h << 14);
            return h ^ (h >>> 16);
        }

    那么为什么要这么做呢,就是为了减小冲突,使得元素能够均匀的分布在不同的segment上。

    2.4、增加元素

        public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;//这里就是在找下标
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }
        //通过key找到segment在segments中的index,再根据index找到segment
        private Segment<K,V> ensureSegment(int k) {
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                int cap = proto.table.length;
                float lf = proto.loadFactor;
                int threshold = (int)(cap * lf);
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
        }

    2.5、查询元素

        public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }


    2.6、size
    先采用不加锁的方式,连续计算元素的个数,最多计算3次:
    1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
    2、如果前后两次计算结果都不同,则在顺序的在所有Segment实例加锁,计算,解锁,然后返回。

        public int size() {
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            final Segment<K,V>[] segments = this.segments;
            int size;
            boolean overflow; // true if size overflows 32 bits
            long sum;         // sum of modCounts
            long last = 0L;   // previous sum
            int retries = -1; // first iteration isn't retry
            try {
                for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    sum = 0L;
                    size = 0;
                    overflow = false;
                    for (int j = 0; j < segments.length; ++j) {
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null) {
                            sum += seg.modCount;
                            int c = seg.count;
                            if (c < 0 || (size += c) < 0)
                                overflow = true;
                        }
                    }
                    if (sum == last) //如果2次统计的结果相等,将统计结果返回
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return overflow ? Integer.MAX_VALUE : size;
        }

    总结:
    ConcurrentHashMap中的get、containsKey、put、putIfAbsent、replace、Remove、clear操作
    由于前面提到Segment中对HashEntry数组以及数组项中的节点链遍历操作是线程安全的,因而get、containsKey操作只需要找到相应的Segment实例,通过Segment实例找到节点链,然后在segment内遍历HashEntry数组即可。
    而对于ConcurrentHashMap中的size、containsValue、contains、isEmpty操作
    因为这些操作需要全局扫瞄整个Map,正常情况下需要先获得所有Segment实例的锁,然后做相应的查找、计算得到结果,再解锁,返回值。然而为了竟可能的减少锁对性能的影响,Doug Lea在这里并没有直接加锁,而是先尝试的遍历查找、计算2遍,如果两遍遍历过程中整个Map没有发生修改(即两次所有Segment实例中modCount值的和一致),则可以认为整个查找、计算过程中Map没有发生改变,我们计算的结果是正确的,否则,在顺序的在所有Segment实例加锁,计算,解锁,然后返回。

    三、JDK或开源框架中使用


    四、示例

  • 相关阅读:
    视频分帧
    windows开启ssh服务
    使用geopy计算经纬度表示的坐标之间的距离
    哔站视频下载
    后缀树(Suffix Tree)
    [回滚莫队] AtCoder 歴史の研究
    [长链剖分优化dp] Codeforces 1499F
    [长链剖分优化dp] BZOJ 3522/4543 Hotel
    长链剖分O(nlogn)-O(1)求K级祖先
    [数论] Codeforces 1499D The Number of Pairs
  • 原文地址:https://www.cnblogs.com/duanxz/p/eclipse.html
Copyright © 2011-2022 走看看