zoukankan      html  css  js  c++  java
  • Java并发容器——ConcurrentSkipListMap和ConcurrentHashMap

    附录:

    https://www.cnblogs.com/ygj0930/p/6543901.html 

         一:ConcurrentSkipListMap

              TreeMap使用红黑树按照key的顺序(自然顺序、自定义顺序)来使得键值对有序存储但是只能在单线程下安全使用;多线程下想要使键值对按照key的顺序来存储,则需要使用ConcurrentSkipListMap。 

               ConcurrentSkipListMap的底层是通过跳表来实现的。跳表是一个链表,但是通过使用“跳跃式”查找的方式使得插入、读取数据时复杂度变成了O(logn)。

               跳表(SkipList):使用“空间换时间”的算法,令链表的每个结点不仅记录next结点位置,还可以按照level层级分别记录后继第level个结点。在查找时,首先按照层级查找,比如:当前跳表最高层级为3,即每个结点中不仅记录了next结点(层级1),还记录了next的next(层级2)、next的next的next(层级3)结点。现在查找一个结点,则从头结点开始先按高层级开始查:head->head的next的next的next->。。。直到找到结点或者当前结点q的值大于所查结点,则此时当前查找层级的q的前一节点p开始,在p~q之间进行下一层级(隔1个结点)的查找......直到最终迫近、找到结点。此法使用的就是“先大步查找确定范围,再逐渐缩小迫近”的思想进行的查找。

               例如:有当前的跳表存储如下:有4个层级,层级1为最下面的level,是一个包含了所有结点的普通链表。往上数就是2,3,4层级。

               (注:图来自 http://blog.csdn.net/sunxianghuang/article/details/52221913,如有冒犯,请见谅)

               现在,我们查找结点值为19的结点:

               明白了查找的原理后,插入、删除就容易理解了。为了保存跳表的有序性,所以分三步:查找合适位置——进行插入/删除——更新跳表指针,维护层级性。

        插入结点:

        删除结点:

         知道了底层所用数据结构的原理后,我们来看看concurrentskiplistmap的部分源码:

         插入:

    private V doPut(K kkey, V value, boolean onlyIfAbsent) {
        Comparable<? super K> key = comparable(kkey);
        for (;;) {
            // 找到key的前继节点
            Node<K,V> b = findPredecessor(key);
            // 设置n为“key的前继节点的后继节点”,即n应该是“插入节点”的“后继节点”
            Node<K,V> n = b.next;
            for (;;) {
                if (n != null) {
                    Node<K,V> f = n.next;
                    // 如果两次获得的b.next不是相同的Node,就跳转到”外层for循环“,重新获得b和n后再遍历。
                    if (n != b.next)
                        break;
                    // v是“n的值”
                    Object v = n.value;
                    // 当n的值为null(意味着其它线程删除了n);此时删除b的下一个节点,然后跳转到”外层for循环“,重新获得b和n后再遍历。
                    if (v == null) {               // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    // 如果其它线程删除了b;则跳转到”外层for循环“,重新获得b和n后再遍历。
                    if (v == n || b.value == null) // b is deleted
                        break;
                    // 比较key和n.key
                    int c = key.compareTo(n.key);
                    if (c > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value))
                            return (V)v;
                        else
                            break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }
    
                // 新建节点(对应是“要插入的键值对”)
                Node<K,V> z = new Node<K,V>(kkey, value, n);
                // 设置“b的后继节点”为z
                if (!b.casNext(n, z))
                    break;         // 多线程情况下,break才可能发生(其它线程对b进行了操作)
                // 随机获取一个level
                // 然后在“第1层”到“第level层”的链表中都插入新建节点
                int level = randomLevel();
                if (level > 0)
                    insertIndex(z, level);
                return null;
            }
        }
    }
    

    删除:

    final V doRemove(Object okey, Object value) {
        Comparable<? super K> key = comparable(okey);
        for (;;) {
            // 找到“key的前继节点”
            Node<K,V> b = findPredecessor(key);
            // 设置n为“b的后继节点”(即若key存在于“跳表中”,n就是key对应的节点)
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                // f是“当前节点n的后继节点”
                Node<K,V> f = n.next;
                // 如果两次读取到的“b的后继节点”不同(其它线程操作了该跳表),则返回到“外层for循环”重新遍历。
                if (n != b.next)                    // inconsistent read
                    break;
                // 如果“当前节点n的值”变为null(其它线程操作了该跳表),则返回到“外层for循环”重新遍历。
                Object v = n.value;
                if (v == null) {                    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                // 如果“前继节点b”被删除(其它线程操作了该跳表),则返回到“外层for循环”重新遍历。
                if (v == n || b.value == null)      // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c < 0)
                    return null;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
    
                // 以下是c=0的情况
                if (value != null && !value.equals(v))
                    return null;
                // 设置“当前节点n”的值为null
                if (!n.casValue(v, null))
                    break;
                // 设置“b的后继节点”为f
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // Retry via findNode
                else {
                    // 清除“跳表”中每一层的key节点
                    findPredecessor(key);           // Clean index
                    // 如果“表头的右索引为空”,则将“跳表的层次”-1。
                    if (head.right == null)
                        tryReduceLevel();
                }
                return (V)v;
            }
        }
    }
    

        查找:

    private Node<K,V> findNode(Comparable<? super K> key) {
        for (;;) {
            // 找到key的前继节点
            Node<K,V> b = findPredecessor(key);
            // 设置n为“b的后继节点”(即若key存在于“跳表中”,n就是key对应的节点)
            Node<K,V> n = b.next;
            for (;;) {
                // 如果“n为null”,则跳转中不存在key对应的节点,直接返回null。
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                // 如果两次读取到的“b的后继节点”不同(其它线程操作了该跳表),则返回到“外层for循环”重新遍历。
                if (n != b.next)                // inconsistent read
                    break;
                Object v = n.value;
                // 如果“当前节点n的值”变为null(其它线程操作了该跳表),则返回到“外层for循环”重新遍历。
                if (v == null) {                // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)  // b is deleted
                    break;
                // 若n是当前节点,则返回n。
                int c = key.compareTo(n.key);
                if (c == 0)
                    return n;
                // 若“节点n的key”小于“key”,则说明跳表中不存在key对应的节点,返回null
                if (c < 0)
                    return null;
                // 若“节点n的key”大于“key”,则更新b和n,继续查找。
                b = n;
                n = f;
            }
        }
    }
    

      

    通过上面的源码可以发现:ConcurrentSkipListMap线程安全的原理与非阻塞队列ConcurrentBlockingQueue的原理一样:利用底层的插入、删除的CAS原子性操作,通过死循环不断获取最新的结点指针来保证不会出现竞态条件。

        二:ConcurrentHashMap【本文concurrentHashMap是jdk1.7中的实现,jdk1.8中使用的不是Segment,特此说明】

            快速存取<Key, Value>键值对使用HashMap;多线程并发存取<Key, Value>键值对使用ConcurrentHashMap;

            我们知道,HashTable和和Collections类中提供的同步HashTable是线程安全的,但是他们线程安全是通过在进行读写操作时对整个map加锁来实现的,故此性能比较低。那既然是由于锁粒度(加锁的范围叫锁粒度)太大造成的性能低下,可不可以从锁粒度着手去改良呢?由此,就引申出了ConcurrentHashMap。

            ConcurrentHashMap采取了“锁分段”技术来细化锁的粒度:把整个map划分为一系列被成为segment的组成单元,一个segment相当于一个小的hashtable。这样,加锁的对象就从整个map变成了一个更小的范围——一个segment。ConcurrentHashMap线程安全并且提高性能原因就在于:对map中的读是并发的,无需加锁;只有在put、remove操作时才加锁,而加锁仅是对需要操作的segment加锁,不会影响其他segment的读写,由此,不同的segment之间可以并发使用,极大地提高了性能。

            1:结构分析

       Segment的结构:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile int count;
        transient int modCount;
        transient int threshold;
        transient volatile HashEntry<K,V>[] table;
        final float loadFactor;
    }
    
    • count:Segment中元素的数量,用于map.size()时统计整个map的大小使用
    • modCount:对table的大小造成影响的操作的数量(比如put或者remove操作),用于统计size时验证结果的正确性
    • threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容,concurrenthashmap自身不会扩容(segment的数量在map创建后不会再增加,在容量不足时只会增加segment的容量
    • table:链表数组,数组中的每一个元素代表了一个链表的头部,一个链表用于存储相同hash值的不同元素们
    • loadFactor:负载因子,用于确定threshold,决定扩容的时机

       

        2:查询

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }
    
    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }
    
    V get(Object key, int hash) {
        if (count != 0) { // read-volatile
            HashEntry<K,V> e = getFirst(hash);
            while (e != null) {
                if (e.hash == hash && key.equals(e.key)) {
                    V v = e.value;
                    if (v != null)
                        return v;
                    return readValueUnderLock(e); // recheck
                }
                e = e.next;
            }
        }
        return null;
    }
    
    HashEntry<K,V> getFirst(int hash) {
        HashEntry<K,V>[] tab = table;
        return tab[hash & (tab.length - 1)];
    }
    

        由上面可以看到:concurrenthashmap的查询操作经过三步:第一次hash确定key在哪个segment中;第二次hash在segment中确定key在链表数组的哪个链表中;第三步遍历这个链表,调用equals()进行比对,找到与所查找key相等的结点并读取。

        3:插入

    V put(K key, int hash, V value, boolean onlyIfAbsent) {
        lock();
        try {
            int c = count;
            if (c++ > threshold) // ensure capacity
                rehash();
            HashEntry<K,V>[] tab = table;
            int index = hash & (tab.length - 1);
            HashEntry<K,V> first = tab[index];
            HashEntry<K,V> e = first;
            while (e != null && (e.hash != hash || !key.equals(e.key)))
                e = e.next;
      
            V oldValue;
            if (e != null) {
                oldValue = e.value;
                if (!onlyIfAbsent)
                    e.value = value;
            }
            else {
                oldValue = null;
                ++modCount;
                tab[index] = new HashEntry<K,V>(key, hash, first, value);
                count = c; // write-volatile
            }
            return oldValue;
        } finally {
            unlock();
        }
    }
    

           插入过程也分三步:首先由key值经过hash计算得到是哪个segment,如果segment大小以及到达阀值则扩容;然后再次hash确定key所在链表的数组下标,获取链表头;最后遍历链表,如果找到相同的key的结点则更新value值,如果没有则插入新结点;

        4:删除

           segment的链表数组中的链表结构如下:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }
    

          我们可以看到,链表中结点只有value是可修改的,因此,如果我们需要删除结点时,是不能简单地由前继结点指向被删结点的后继结点来实现。所以,我们只能重构链表。

    V remove(Object key, int hash, Object value) {
        lock();
        try {
            int c = count - 1;
            HashEntry<K,V>[] tab = table;
            int index = hash & (tab.length - 1);
            HashEntry<K,V> first = tab[index];
            HashEntry<K,V> e = first;
            while (e != null && (e.hash != hash || !key.equals(e.key)))
                e = e.next;
      
            V oldValue = null;
            if (e != null) {
                V v = e.value;
                if (value == null || value.equals(v)) {
                    oldValue = v;
                    // All entries following removed node can stay
                    // in list, but all preceding ones need to be
                    // cloned.
                    ++modCount;
                    HashEntry<K,V> newFirst = e.next;
                    for (HashEntry<K,V> p = first; p != e; p = p.next)
                        newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                      newFirst, p.value);
                    tab[index] = newFirst;
                    count = c; // write-volatile
                }
            }
            return oldValue;
        } finally {
            unlock();
        }
    }
    

           删除过程:首先由key经过hash确定所在segment;然后再hash确定具体的数组下标,获得链表头;最后遍历链表,找到被删除结点后,以被删除结点的next结点开始建立新的链表,然后再把原链表头直到被删结点的前继结点依次复制、插入新链表,最后把新链表头设置为当前数组下标元素取代旧链表。

        5:统计大小—Size()

           统计整个map的大小时,如果在统计过程中把整个map锁住,则会造成影响读写。ConcurrentHashMap通过采用segment中的属性成员来优化这个过程。

     
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile int count;
        transient int modCount;
       ....
    }

           我们看到,每个segment中有一个count记录当前segment的元素数量,每当put/remove成功就会把这个值+1/-1。因此,在统计map的大小时,我们把每个segment的count加起来就是了。但是,如果在加的过程中,发生了修改怎么办呢?比如:把segment[2]的count加到total后,segment[2]发生了remove操作,这样就会造成统计结果不正确。此时就需要用modCount,modCount记录了segment的修改次数,这个值只增不减,无论是插入、删除都会导致该值+1.

           ConcurrentHashMap在统计size时,经历了两次遍历:第一次不加锁地遍历所以segment,统计count和modCount的总和得到C1和M1;然后再次不加锁地遍历,得到C2和M2,比较M1和M2,如果修改次数没有发生变化则说明两次遍历期间map没有发生数量变化,那么C1就是可用的。如果M1不等于M2,则说明在统计过程中map的数量发生了变化,此时才采取最终手段——锁住整个map进行统计。

  • 相关阅读:
    对MVC HtmlHepler控件扩展(转载)
    通过源代码研究ASP.NET MVC中的Controller和View(一)(转载)
    rasmol使用方法
    经典笑话
    直线回归的概念
    众数
    Python ImportError: No module named Tkinter
    小干扰RNA
    complementary DNA, cDNA
    反义RNA
  • 原文地址:https://www.cnblogs.com/williamjie/p/11089593.html
Copyright © 2011-2022 走看看