zoukankan      html  css  js  c++  java
  • 并发容器-ConcurrentHashMap,CopyOnWriteArrayList

    ConcurrentHashMap
    HashMap是线程非安全的,在多线程环境下,采用的是Fail-Fast快速失败机制,即当A线程在访问容器的时候,如果此时B线程修改了HashMap的结构,那么就会抛出并发修改异常。且当A线程添加一个Entry的时候,它会首先获得头节点,如果此时B线程也要添加一个Entry的时候,它会获得同样的头节点,那么当A线程添加完newEntry之后,B线程实际上会将A的覆盖掉。
    HashTable是一个线程安全的类,它使用synchronized来锁住整张Hash表来实现线程安全,即每次锁住整张表让线程独占。
    Collections.synchronizedMap()是对方法加锁
    public V get(Object key) {
                synchronized (mutex) {return m.get(key);}
            }
    能保证线程安全性,却影响并发条件下的性能,因为必须当A线程释放锁之后,B线程才能够获得锁并对map进行操作。
     
    那么要如何保证在多线程环境下的线程安全性并保证性能呢?使用ConcurrentHashMap
    ConcurrentHashMap采用的是分段锁技术,它的底层是一个Segment[]的数组(数组默认大小为16,即并发数为16),每个segment称为一个段,在每一段上又是一个类似于HashMap的数据结构,每个segment上都有一个锁。
     
    ConcurrentHashMap允许多个修改操作并发执行,只要多个修改操作发生在不同的段上。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。

    ConcurrentHashMap给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

    每个Segment都继承了ReentrantLock

    static class Segment<K,V> extends ReentrantLock 

     

    如何在ConcurrentHashMap中put一个Entry呢?

    public V put(K key, V value) {
          Segment<K,V> s;
          if (value == null)      //判断value是否为null,如果为null直接抛出空指针异常
              thrownew NullPointerException();
          int hash = hash(key);   //第一次计算hash值
          int j = (hash >>> segmentShift) & segmentMask;    //第二次计算hash值,这个值确定Segment的索引
          if ((s = (Segment<K,V>)UNSAFE.getObject          //获得Segment对象
               (segments, (j << SSHIFT) + SBASE)) == null) 
              s = ensureSegment(j);                         //采用的是延迟初始化机制return s.put(key, hash, value, false);            //真正的put,put操作是需要加锁的
      }
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;   //第三次hash操作,获得table中的具体index
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }

     

    如何从ConcurrentHashMap中根据key获取value呢?

    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }

    值得注意的是,get操作是不需要加锁的,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。

    size操作需要遍历所有的Segment才能算出整个Map的大小。假设我们当前遍历的Segment为S1,那么在遍历S1过程中其他的Segment比如S2可能会被修改,于是这一次运算出来的size值可能并不是Map当前的真正大小。所以一个比较简单的办法就是计算Map大小的时候所有的Segment都Lock住,不能更新(包含put,remove等等)数据,计算完之后再Unlock。这是普通人能够想到的方案,但是牛逼的作者还有一个更好的Idea:先给3次机会,不lock所有的Segment,遍历所有Segment,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment。源代码如下:

    public int size() {
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            final Segment<K,V>[] segments = this.segments;
            int size;
            boolean overflow; 
            long sum;         // 总的修改次数
            long last = 0L;   // 前一次的修改次数
            int retries = -1; 
            try {
                for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // 如果三次还不行,则需要强制给所有segment加锁
                    }
                    sum = 0L;
                    size = 0;
                    overflow = false;
                    for (int j = 0; j < segments.length; ++j) {
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null) {
                            sum += seg.modCount;
                            int c = seg.count;
                            if (c < 0 || (size += c) < 0)
                                overflow = true;
                        }
                    }
                    if (sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return overflow ? Integer.MAX_VALUE : size;
        }

     

    containsValue操作采用了和size操作一样的想法:

    public boolean containsValue(Object value) {
            // Same idea as size()
            if (value == null)
                throw new NullPointerException();
            final Segment<K,V>[] segments = this.segments;
            boolean found = false;
            long last = 0;
            int retries = -1;
            try {
                outer: for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    long hashSum = 0L;
                    int sum = 0;
                    for (int j = 0; j < segments.length; ++j) {
                        HashEntry<K,V>[] tab;
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null && (tab = seg.table) != null) {
                            for (int i = 0 ; i < tab.length; i++) {
                                HashEntry<K,V> e;
                                for (e = entryAt(tab, i); e != null; e = e.next) {
                                    V v = e.value;
                                    if (v != null && value.equals(v)) {
                                        found = true;
                                        break outer;
                                    }
                                }
                            }
                            sum += seg.modCount;
                        }
                    }
                    if (retries > 0 && sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return found;
        }

     

     

     
    ArrayList是非线程安全的,Vector是线程安全的,因为它的方法都加了synchronized关键字,但是Vector效率很低。
        public synchronized int size() {
            return elementCount;
        }
     
    CopyOnWriteArrayList:写时复制ArrayList,在读的时候不加锁,在写的时候,首先会对原数组a进行拷贝a1,修改操作都是在a1上进行的,当修改完成之后,再将a指针指向拷贝的数组。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
     
    向List中添加一个元素,在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。
    /**
         * Appends the specified element to the end of this list.
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            finally {
                lock.unlock();
            }
        }
     
     
    读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的List。
    public E get(int index) {
        return get(getArray(), index);
    }
     
     
    CopyOnWriteArrayList用于读多写少的并发场景,它只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
     
     
     
     
  • 相关阅读:
    echarts雷达图点击事件 包含(2.x,3.85,4.02)测试
    字体图标制作
    通过php动态传数据到highcharts
    smartgit试用到期不用序列号怎么继续使用
    项目的ip地址更改,用git从远程提取代码出现错误,提示为 network error connection timed out
    OC学习4——OC新特性之块(Block)
    OC学习3——C语言特性之指针
    OC学习2——C语言特性之函数
    OC学习1——基本数据类型
    JVM 内存的那些事
  • 原文地址:https://www.cnblogs.com/james111/p/6995756.html
Copyright © 2011-2022 走看看