zoukankan      html  css  js  c++  java
  • 浅谈ConcurrentHashMap实现原理

    我们都知道HashTable是线程安全的类,因为使用了Synchronized来锁整张Hash表来实现线程安全,让线程独占;

    ConcurrentHashMap的锁分离技术就是用多个锁来控制对Hash表的不同部分进行修改,因为我可能只需要对一小块部分进行操作,而如果锁整张表开销太大了,其内部实现就是用Semgent来控制的,每个Semgent都是一个小的HashTable,他们有自己的锁;

    然后有些方法需要访问整个表,比如Size,ContainsValue肯定是要访问整个表的数据,这个时候就需要锁整张表,ConcurrentHashMap就会按顺序锁定每个Segment,操作完毕之后再按顺序释放每个Segment,按顺序是为了防止出现死锁;

    引用一张图片解释ConcurrentHashMap的结构:

    可以看出就是在HashMap的基础上多了一个数组(暂且这样理解),数组的每个元素就是Segment;Segment<K,V> extends ReentrantLock implements Serializable;

    ConcurrentHashMap的构造函数:

    public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
            int sshift = 0;
            int ssize = 1;
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;
            }
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }

    ConcurrentHahsMap进行Put操作的时候,对Key需要进行3次Hash运算才能确定最终的位置:hash1(key) -> x1(得到一个值);hash2(x1) -> x2(确定Segment位置,第二次hash是为了减少hash碰撞);hash3(x2) -> x3(确定元素放入哪个HashEntry);这里只是用比较好理解的白话说,下面会说原理;

    ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点);

     static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         final HashEntry<K,V> next;  
     } 

    可以看出,除了value不是final,其next属性都是final,说明不能从hash链的中间或尾部添加或删除节点;对于Put操作,可以一律将元素添加到hash链的头部,而对于remove操作,从中间删除一个节点,将该节点的前面所有节点复制一份并指向删除节点的下一节点;这会产生两条Hash链,这也是为了保证多线程访问的同步性;将value设置成volatile,这样避免了加锁;因为一个线程在执行get,另一个线程在执行remove,remove还没有执行完的时候,get能看到remove之前的值,如下图:

    下面说说Segment的数据结构:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {    
             /** 
              * count用来统计该段数据的个数,它是volatile,如增加/删除节点,都要写count值,每次读取操作开始都要读取count的值。
              */
             transient volatileint count;  
             /** 
              * modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变
              */
             transient int modCount;  
             /** 
              * rehash界限值
              */
             transient int threshold;  
             /** 
              * 就是上面的HashEntry
              */
             transient volatile HashEntry<K,V>[] table;  
             /** 
              * 负载因子
              */
             final float loadFactor;  
     }

    ConcurrentHashMap的get方法是直接调用Segment的get方法:

    public V get(Object key) {
        int hash = hash(key); // throws NullPointerException if key null
        return segmentFor(hash).get(key, hash);
    }
    V get(Object key, int hash) {  
         if (count != 0) { // read-volatile 当前桶的数据个数是否为0 
             HashEntry<K,V> e = getFirst(hash);  得到头节点
             while (e != null) {  
                 if (e.hash == hash && key.equals(e.key)) {  
                     V v = e.value;  
                     if (v != null)  
                         return v;  
                     return readValueUnderLock(e); // recheck  
                 }  
                 e = e.next;  
             }  
         }  
         return null;  
     } 
     V readValueUnderLock(HashEntry<K,V> e) {  
         lock();  
         try {  
             return e.value;  
         } finally {  
             unlock();  
         }  
     }

    get方法是不需要加锁的,因为上面已经看到了count和HashEntry都是volatile;

    源码中增加value可能为null的情况,是为了保险起见,当节点正在发生改变而又未锁定时就可能为空,而HashEntry中的value不是final的,所以个人认为是为了加此判断;

    如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景;

    接下来看看Put操作:

    V put(K key, int hash, V value, boolean onlyIfAbsent) {  
         lock();  
         try {  
             int c = count;  
             if (c++ > threshold) // ensure capacity  
                 rehash();  
             HashEntry<K,V>[] tab = table;  
             int index = hash & (tab.length - 1);  
             HashEntry<K,V> first = tab[index];  
             HashEntry<K,V> e = first;  
             while (e != null && (e.hash != hash || !key.equals(e.key)))  
                 e = e.next;  
             V oldValue;  
             if (e != null) {  
                 oldValue = e.value;  
                 if (!onlyIfAbsent)  
                     e.value = value;  
             }  
             else {  
                 oldValue = null;  
                 ++modCount;  
                 tab[index] = new HashEntry<K,V>(key, hash, first, value);  
                 count = c; // write-volatile  
             }  
             return oldValue;  
         } finally {  
             unlock();  
         }  
     }

    先上个锁,做个预判,容量+1的时候需不需要扩容,这个判断相比于HashMap做的非常精妙,因为HashMap是在增加元素之后才判断是否需要扩容,如果我扩容之后不增加元素,就白扩容了;

    通过索引定位到HashEntry,判断如果不为空,替换节点的value,否则new一个Entry,将此Entry插入到链头,由构造函数得知,它的next则是first;

    扩容默认也是会创建两倍容量的数组,进行计算将原数组的数据插入到新数组,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容;

    ConcurrentHashMap的size方法

    需要统计所有segment的元素总和,为了不影响性能,它会先2次尝试不锁住segment来统计大小,如果统计的过程中count发生了变化,则会把所有的segment的put和remove方法全部锁住;通过modCount来协助判断容器是否发生变化;统计size前后判断modCount是否发生变化即可;

    参考:http://www.cnblogs.com/ITtangtang/p/3948786.html

  • 相关阅读:
    js中this的用法
    js原型链与继承(初体验)
    关于C语言指针中的p++与p+i
    react todolist
    react表单提交
    react条件渲染
    react初体验
    初次接触express
    阿里内推在线编程题(返回元素的选择器)
    模块初始化
  • 原文地址:https://www.cnblogs.com/pengx/p/9602099.html
Copyright © 2011-2022 走看看