zoukankan      html  css  js  c++  java
  • Java ConcurrentHashMap

     通过分析Hashtable就知道,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,

    ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。

    ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。
    有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,

    在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

    ConcurrentHashMap和Hashtable主要区别就是围绕着锁的粒度以及如何锁,可以简单理解成把一个大的HashTable分解成多个,形成了锁分离。

     ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)

     

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。

    Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。

    一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,

    一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,

    每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

    Get方法:
    1.为输入的Key做Hash运算,得到hash值。
    2.通过hash值,定位到对应的Segment对象
    3.再次通过hash值,定位到Segment当中数组的具体位置。
    Put方法:
    1.为输入的Key做Hash运算,得到hash值。
    2.通过hash值,定位到对应的Segment对象
    3.获取可重入锁
    4.再次通过hash值,定位到Segment当中数组的具体位置。
    5.插入或覆盖HashEntry对象。
    6.释放锁。


    ConcurrentHashMap的Size方法是一个嵌套循环,大体逻辑如下:
    1.遍历所有的Segment。
    2.把Segment的元素数量累加起来。
    3.把Segment的修改次数累加起来。
    4.计算多次Size,判断所有Segment的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
    5.如果尝试次数超过阈值,则对每一个Segment加锁,再重新统计。
    6.再次判断所有Segment的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
    7.释放锁,统计结束。

    为了尽量不锁住所有Segment,首先乐观地假设Size过程中不会有修改。当尝试一定次数,才无奈转为悲观锁,锁住所有Segment保证强一致性。

     JDK1.7版本:

     public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
            int sshift = 0;
            int ssize = 1;
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;
            }
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }

    1.concurrencyLevel,含义,并发级别,并发度,在键不冲突的情况下,最多允许多少个线程同时访问数据不需要阻塞(理想情况下),我们应该知道,ConcurrentHashMap的基本实现原理就是引入Segment数据结构,将锁的粒度细化到Segment,也就是说,如果多个线程,同时操作多个key,如果这些key,分布在不同的Segment,那这些线程的操作互不影响,当然不需要加锁,提高性能。所以concurrencyLevel,就是要求告诉ConcurrentHashMap,我需要这么过个线程同时访问你而不产生锁冲突。
    2.ssize,该变量的值等于ConcurrentHashMap中segment的长度,也就是 Segment[]的长度。该值取决于concurrencyLevel,其实就是小于concurrencyLevel的最大的2的幂,,比如concurrencyLevel=16,那 ssize=16,
    如果 concurrencyLevel=12,ssize=8,因为ssize的长度为2的幂。
    3.shift的值,看出来了没,其实就是 ssize 2 ^ shift,其实就是表示ssize需要的二进制位。
    4.segmentMask、segmentShift ,这两个属性在该表达式中使用:(h >>> segmentShift) & segmentMask),很明显,就是用来算Segment[]数组中的下标来的。意图segmentShift = 32 - sshift,也就是利用hash的高位与代表(ssize-1)来定位下标。// 如果默认,初始容量16,那么ssize=16, sshift=4 定位端 hash 无符号向右移多少28位,(总共32位),那就是使原本32-29位参与运算(高位)
    5.cap,就是每个Segment中HashEntity[]的长度,大于【初始容量/segment长度】的最小2的幂。

    分析到这里,ConcurrentHashMap就构建成功了,我们先重点关注一下Segment的数据结构。
    Segment段的内部数据结构如下:
    1)类的声明:static final class Segment<K,V> extends ReentrantLock implements Serializable
    2)数据结构:
    transient volatile HashEntry<K,V>[] table; // 内部键值对
    transient int count; // 元素数量
    transient int modCount; // 结构发生变化的次数
    transient int threshold; // 扩容时的阔值
    final float loadFactor; // 扩容因子,主要影响threshold,影响什么时候扩容
    对上述结构,是否似曾相识,对了,就是它,HashMap;每个Segment其实就是一个HashMap;还有一个很关键点:Segment继承自ReentrantLock,也就是Segment本身就是一把锁。

    get方法:

     public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }

    put方法:

        public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }

    containsKey方法:

     public boolean containsKey(Object key) {
            Segment<K,V> s; // same as get() except no need for volatile value read
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return true;
                }
            }
            return false;
        }

    containsValue方法:

     public boolean containsValue(Object value) {
            // Same idea as size()
            if (value == null)
                throw new NullPointerException();
            final Segment<K,V>[] segments = this.segments;
            boolean found = false;
            long last = 0;
            int retries = -1;
            try {
                outer: for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    long hashSum = 0L;
                    int sum = 0;
                    for (int j = 0; j < segments.length; ++j) {
                        HashEntry<K,V>[] tab;
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null && (tab = seg.table) != null) {
                            for (int i = 0 ; i < tab.length; i++) {
                                HashEntry<K,V> e;
                                for (e = entryAt(tab, i); e != null; e = e.next) {
                                    V v = e.value;
                                    if (v != null && value.equals(v)) {
                                        found = true;
                                        break outer;
                                    }
                                }
                            }
                            sum += seg.modCount;
                        }
                    }
                    if (retries > 0 && sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return found;
        }

    size方法:

       public int size() {
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            final Segment<K,V>[] segments = this.segments;
            int size;
            boolean overflow; // true if size overflows 32 bits
            long sum;         // sum of modCounts
            long last = 0L;   // previous sum
            int retries = -1; // first iteration isn't retry
            try {
                for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    sum = 0L;
                    size = 0;
                    overflow = false;
                    for (int j = 0; j < segments.length; ++j) {
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null) {
                            sum += seg.modCount;
                            int c = seg.count;
                            if (c < 0 || (size += c) < 0)
                                overflow = true;
                        }
                    }
                    if (sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return overflow ? Integer.MAX_VALUE : size;
        }

    Segment类:

       static final class Segment<K,V> extends ReentrantLock implements Serializable {
            /*
             * Segments maintain a table of entry lists that are always
             * kept in a consistent state, so can be read (via volatile
             * reads of segments and tables) without locking.  This
             * requires replicating nodes when necessary during table
             * resizing, so the old lists can be traversed by readers
             * still using old version of table.
             *
             * This class defines only mutative methods requiring locking.
             * Except as noted, the methods of this class perform the
             * per-segment versions of ConcurrentHashMap methods.  (Other
             * methods are integrated directly into ConcurrentHashMap
             * methods.) These mutative methods use a form of controlled
             * spinning on contention via methods scanAndLock and
             * scanAndLockForPut. These intersperse tryLocks with
             * traversals to locate nodes.  The main benefit is to absorb
             * cache misses (which are very common for hash tables) while
             * obtaining locks so that traversal is faster once
             * acquired. We do not actually use the found nodes since they
             * must be re-acquired under lock anyway to ensure sequential
             * consistency of updates (and in any case may be undetectably
             * stale), but they will normally be much faster to re-locate.
             * Also, scanAndLockForPut speculatively creates a fresh node
             * to use in put if no node is found.
             */
    
            private static final long serialVersionUID = 2249069246763182397L;
    
            /**
             * The maximum number of times to tryLock in a prescan before
             * possibly blocking on acquire in preparation for a locked
             * segment operation. On multiprocessors, using a bounded
             * number of retries maintains cache acquired while locating
             * nodes.
             */
            static final int MAX_SCAN_RETRIES =
                Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    
            /**
             * The per-segment table. Elements are accessed via
             * entryAt/setEntryAt providing volatile semantics.
             */
            transient volatile HashEntry<K,V>[] table;
    
            /**
             * The number of elements. Accessed only either within locks
             * or among other volatile reads that maintain visibility.
             */
            transient int count;
    
            /**
             * The total number of mutative operations in this segment.
             * Even though this may overflows 32 bits, it provides
             * sufficient accuracy for stability checks in CHM isEmpty()
             * and size() methods.  Accessed only either within locks or
             * among other volatile reads that maintain visibility.
             */
            transient int modCount;
    
            /**
             * The table is rehashed when its size exceeds this threshold.
             * (The value of this field is always <tt>(int)(capacity *
             * loadFactor)</tt>.)
             */
            transient int threshold;
    
            /**
             * The load factor for the hash table.  Even though this value
             * is same for all segments, it is replicated to avoid needing
             * links to outer object.
             * @serial
             */
            final float loadFactor;
    
            Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
                this.loadFactor = lf;
                this.threshold = threshold;
                this.table = tab;
            }
    
            final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    
            /**
             * Doubles size of table and repacks entries, also adding the
             * given node to new table
             */
            @SuppressWarnings("unchecked")
            private void rehash(HashEntry<K,V> node) {
                /*
                 * Reclassify nodes in each list to new table.  Because we
                 * are using power-of-two expansion, the elements from
                 * each bin must either stay at same index, or move with a
                 * power of two offset. We eliminate unnecessary node
                 * creation by catching cases where old nodes can be
                 * reused because their next fields won't change.
                 * Statistically, at the default threshold, only about
                 * one-sixth of them need cloning when a table
                 * doubles. The nodes they replace will be garbage
                 * collectable as soon as they are no longer referenced by
                 * any reader thread that may be in the midst of
                 * concurrently traversing table. Entry accesses use plain
                 * array indexing because they are followed by volatile
                 * table write.
                 */
                HashEntry<K,V>[] oldTable = table;
                int oldCapacity = oldTable.length;
                int newCapacity = oldCapacity << 1;
                threshold = (int)(newCapacity * loadFactor);
                HashEntry<K,V>[] newTable =
                    (HashEntry<K,V>[]) new HashEntry[newCapacity];
                int sizeMask = newCapacity - 1;
                for (int i = 0; i < oldCapacity ; i++) {
                    HashEntry<K,V> e = oldTable[i];
                    if (e != null) {
                        HashEntry<K,V> next = e.next;
                        int idx = e.hash & sizeMask;
                        if (next == null)   //  Single node on list
                            newTable[idx] = e;
                        else { // Reuse consecutive sequence at same slot
                            HashEntry<K,V> lastRun = e;
                            int lastIdx = idx;
                            for (HashEntry<K,V> last = next;
                                 last != null;
                                 last = last.next) {
                                int k = last.hash & sizeMask;
                                if (k != lastIdx) {
                                    lastIdx = k;
                                    lastRun = last;
                                }
                            }
                            newTable[lastIdx] = lastRun;
                            // Clone remaining nodes
                            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                                V v = p.value;
                                int h = p.hash;
                                int k = h & sizeMask;
                                HashEntry<K,V> n = newTable[k];
                                newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                            }
                        }
                    }
                }
                int nodeIndex = node.hash & sizeMask; // add the new node
                node.setNext(newTable[nodeIndex]);
                newTable[nodeIndex] = node;
                table = newTable;
            }
    
            /**
             * Scans for a node containing given key while trying to
             * acquire lock, creating and returning one if not found. Upon
             * return, guarantees that lock is held. UNlike in most
             * methods, calls to method equals are not screened: Since
             * traversal speed doesn't matter, we might as well help warm
             * up the associated code and accesses as well.
             *
             * @return a new node if key not found, else null
             */
            private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // speculatively create node
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }
    
            /**
             * Scans for a node containing the given key while trying to
             * acquire lock for a remove or replace operation. Upon
             * return, guarantees that lock is held.  Note that we must
             * lock even if the key is not found, to ensure sequential
             * consistency of updates.
             */
            private void scanAndLock(Object key, int hash) {
                // similar to but simpler than scanAndLockForPut
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                int retries = -1;
                while (!tryLock()) {
                    HashEntry<K,V> f;
                    if (retries < 0) {
                        if (e == null || key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f;
                        retries = -1;
                    }
                }
            }
    
            /**
             * Remove; match on key only if value null, else match both.
             */
            final V remove(Object key, int hash, Object value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                V oldValue = null;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> e = entryAt(tab, index);
                    HashEntry<K,V> pred = null;
                    while (e != null) {
                        K k;
                        HashEntry<K,V> next = e.next;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            V v = e.value;
                            if (value == null || value == v || value.equals(v)) {
                                if (pred == null)
                                    setEntryAt(tab, index, next);
                                else
                                    pred.setNext(next);
                                ++modCount;
                                --count;
                                oldValue = v;
                            }
                            break;
                        }
                        pred = e;
                        e = next;
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    
            final boolean replace(K key, int hash, V oldValue, V newValue) {
                if (!tryLock())
                    scanAndLock(key, hash);
                boolean replaced = false;
                try {
                    HashEntry<K,V> e;
                    for (e = entryForHash(this, hash); e != null; e = e.next) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            if (oldValue.equals(e.value)) {
                                e.value = newValue;
                                ++modCount;
                                replaced = true;
                            }
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return replaced;
            }
    
            final V replace(K key, int hash, V value) {
                if (!tryLock())
                    scanAndLock(key, hash);
                V oldValue = null;
                try {
                    HashEntry<K,V> e;
                    for (e = entryForHash(this, hash); e != null; e = e.next) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            e.value = value;
                            ++modCount;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    
            final void clear() {
                lock();
                try {
                    HashEntry<K,V>[] tab = table;
                    for (int i = 0; i < tab.length ; i++)
                        setEntryAt(tab, i, null);
                    ++modCount;
                    count = 0;
                } finally {
                    unlock();
                }
            }
        }
    View Code

    ConcurrentHashMap中的可以同时多个get,一个put,在get的时候可能会与put的有冲突,在put赋值的时候,value可能为null,也可能读取到修改前,或后的值

    如果读取到null,就进行加锁重新读取(readValueUnderLock),读取到修改前后的值是可以允许的。

    http://www.jasongj.com/java/concurrenthashmap/

    http://blog.csdn.net/prestigeding/article/details/53391264

    http://www.infoq.com/cn/articles/ConcurrentHashMap/

    http://www.cnblogs.com/ITtangtang/p/3948786.html

    http://blog.csdn.net/xuefeng0707/article/details/40834595

    http://mp.weixin.qq.com/s?__biz=MzIxMjE5MTE1Nw==&mid=2653192083&idx=1&sn=5c4becd5724dd72ad489b9ed466329f5&chksm=8c990d49bbee845f69345e4121888ec967df27988bc66afd984a25331d2f6464a61dc0335a54&scene=21#wechat_redirect

  • 相关阅读:
    Jenkins Install
    提高C#代码质量的22条准则
    游戏程序员英文指南
    苹果设备内存指南
    Unity符号表
    UI优化策略-UI性能优化技巧
    C# 语言历史版本特性
    CPU SIMD介绍
    Unity渲染性能指标
    关于JMeter线程组中线程数,Ramp-Up Period,循环次数之间的设置概念
  • 原文地址:https://www.cnblogs.com/hongdada/p/6041207.html
Copyright © 2011-2022 走看看