zoukankan      html  css  js  c++  java
  • ConcurrentHashMap如何保证线程安全

    HashMap的put,get,size等方法都不是线程安全的,而HashTable虽然保证了线程安全,但却是用了效率极低的方法,在put,get,size等方法上加上了synchronized,这就导致所有的并发进程都要竞争同一把锁,一个线程在进行同步操作时,其他线程都需要等待。

    为了保证集合的线程安全性,Jdk提供了同步包装器,比如说Collections.synchronizedMap,不过它们都是利用粗粒度的同步方式,高并发情况下性能较差。

    可以看看Collections.synchronizedMap的实现。

    image.png

    image.png

    与并发安全有关的代码都使用了synchronized关键字进行了修饰,使用的锁mutex可以在构造方法中看到就是this。

    虽然说不在是用synchronized来修饰方法,但是还是使用了this做mutex锁,治标不治本。

    更加普遍的选择是利用并发包提供的线程安全容器类,它提供了各种并发容器,比如ConcurrentHashMap、CopyOnWriteArrayList。

    各种线程安全队列,如ArrayBlockingQueue、SynchronousQueue。

    ConcurrentHashMap分析

    ConcurrentHashMap和HashMap一样,在Java8时结构上发生了很大变化。

    之前的ConcurrentHashMap的实现是基于分离锁,也就是内部进行分段,每一个段中都拥有HashEntry数组,hashcode相同的key也是按照链表的方式存储的。

    image.png

    HashEntry内部使用volatile修饰的value字段来保证可见性,也利用了不可变对象的机制以改进利用Unsafe提供的底层能力,比如volatile access,去直接完成部分操作,以最优化性能,毕竟Unsafe中的很多操作都是JVM intrinsic优化过的。

    Segment的初始大小由DEFAULT_CONCURRENCY_LEVEL来确定,默认是16,在构造方法中可以自定义大小,不过必须是2的幂,如果输入7,会自动创建8。

    image.png

    对于get方法来说,只需要保证可见性,无需其他同步操作。

    关键是put方法,首先是通过二次哈希避免哈希冲突,然后以Unsafe调用方式,直接获取相应的Segment,然后进行线程安全的put操作。

     public V put(K key, V value) {
     	Segment<K,V> s;
     	if (value == null)
    		throw new NullPointerException();
     	// 二次哈希,以保证数据的分散性,避免哈希冲突
     	int hash = hash(key.hashCode());
     	int j = (hash >>> segmentShift) & segmentMask;
     	if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
     	(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
     		s = ensureSegment(j);
     	return s.put(key, hash, value, false);
     }
    

    put的核心实现如下。

    fnal V put(K key, int hash, V value, boolean onlyIfAbsent) {
     	// scanAndLockForPut会去查找是否有key相同Node
     	// 无论如何,确保获取锁
     	HashEntry<K,V> node = tryLock() ? null :
     	scanAndLockForPut(key, hash, value);
     	V oldValue;
    	try {
     		HashEntry<K,V>[] tab = table;
     		int index = (tab.length - 1) & hash;
     		HashEntry<K,V> frs = entryAt(tab, index);
     		for (HashEntry<K,V> e = frs;;) {
     			if (e != null) {
    				K k;
    				// 更新已有value...
     			} else {
     			// 放置HashEntry到特定位置,如果超过阈值,进行rehash
     			// ...
     			}
     		}
    	} fnally {
     		unlock();
     	}
    	return oldValue;
    }
    

    首先也是通过 Key 的 Hash 定位到具体的 Segment,在 put 之前会进行一次扩容校验。这里比 HashMap 要好的一点是:HashMap 是插入元素之后再看是否需要扩容,有可能扩容之后后续就没有插入就浪费了本次扩容(扩容非常消耗性能)。而 ConcurrentHashMap 不一样,它是在将数据插入之前检查是否需要扩容,之后再做插入操作。

    ConcurrentHashMap会获取再入锁,以保证数据一致性,Segment本身就是基于ReentrantLock的扩展实现,所以,在并发修改期间,相应Segment是被锁定的。

    在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突
    是ConcurrentHashMap的常见技巧。

    另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。
    试想,如果不进行同步,简单的计算所有Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有Segment进行计算,就会变得非常昂贵。

    所以,ConcurrentHashMap的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对
    比Segment.modCount),就直接返回,否则获取锁进行操作。

    每个Segment都有一个volatile修饰的全局变量count,求整个 ConcurrentHashMap的size时很明显就是将所有的count累加即可。但是volatile修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。

    通过尝试两次将count累加,如果容器的count发生了变化再加锁来统计size,可以有效的避免加锁的问题。

    以上是JDK1.7及之前的ConcurrentHashMap的结构,在1.8之后,ConcurrentHashMap结构发生了很大变化,由之前的分段锁变为了CAS+synchronized。

    我在网上找到了一幅结构图。

    image.png

    总体结构上,它的内部存储和HashMap结构非常相似,同样是大的桶(bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要更细致一些。

    其内部仍然有Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。

    因为不再使用Segment,初始化操作大大简化,修改为lazy-load形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。

    将1.7中存放数据的HashEntry改为Node,但作用都是相同的。其中的val next 都用了volatile修饰,保证了可见性。

     satic class Node<K,V> implements Map.Entry<K,V> {
     	fnal int hash;
     	fnal K key;
     	volatile V val;
     	volatile Node<K,V> next;
     	// …
     }
    

    在Node中,key被定义为了final,因为照常理来说,key是不会改变了,而是value会发生改变,因此在val中添加了volatile来修饰。

    来看看它的put操作。

    fnal V putVal(K key, V value, boolean onlyIfAbsent) { 
    	if (key == null || value == null) throw new NullPointerException();
     		int hash = spread(key.hashCode());
     		int binCount = 0;
     		for (Node<K,V>[] tab = table;;) {
     			Node<K,V> f; int n, i, fh; K fk; V fv;
     			if (tab == null || (n = tab.length) == 0)
     				tab = initTable();
     			else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
     			// 利用CAS去进行无锁线程安全操作,如果bin是空的
     				if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
     					break;
     			}
    			else if ((fh = f.hash) == MOVED)
     tab = helpTransfer(tab, f);
     			else if (onlyIfAbsent // 不加锁,进行检查
     && fh == hash
     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
     && (fv = f.val) != null)
     				return fv;
     			else {
     				V oldVal = null;
     				synchronized (f) {
     				// 细粒度的同步修改操作...
     				}
     			}
    			// Bin超过阈值,进行树化
    			if (binCount != 0) {
     				if (binCount >= TREEIFY_THRESHOLD)
     					treeifyBin(tab, i);
    				if (oldVal != null)
     					return oldVal;
     				break;
     			}
     		}
     	}
     	addCount(1L, binCount);
     	return null;
    }
    
    

    具体流程如下:

    1. 根据key计算出hashcode 。
    2. 判断是否需要进行初始化initTable。
    3. f即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
    4. 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
    5. 如果都不满足,则利用 synchronized 锁写入数据。
    6. 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

    可以发现1.8以后的锁的颗粒度,是加在链表头上的,这是一个很重要的突破。

    1.8中不依赖与segment加锁,segment数量与桶数量一致。

    首先判断容器是否为空,为空则进行初始化利用volatile的sizeCtl作为互斥手段,如果发现竞争性的初始化,就暂停在那里,等待条件恢复,否则利用CAS设置排他标志(U.compareAndSwapInt(this, SIZECTL, sc, -1)),否则重试
    对key hash计算得到该key存放的桶位置,判断该桶是否为空,为空则利用CAS设置新节点,否则使用synchronize加锁,遍历桶中数据,替换或新增加点到桶中。最后判断是否需要转为红黑树,转换之前判断是否需要扩容。

    最后来看看1.8 ConcurrentHashMap的size方法实现。

    image.png
    image.png

    在sumCount方法中使用到了CounterCell,对于CounterCell的操作,是基于java.util.concurrent.atomic.LongAdder进行的,是一种JVM利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。

    // 一种用于分配计数的填充单元。改编自LongAdder和Striped64。请查看他们的内部文档进行解释。
    @sun.misc.Contended 
    static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
    
  • 相关阅读:
    项目经理成长之路-初入职场(二)
    项目经理成长之路-我的大学(一)
    别了郑州,2020再出发
    RPC协议实践入门
    Spark学习进度11-Spark Streaming&Structured Streaming
    使用Python自动填写问卷星(pyppeteer反爬虫版)
    All mirror URLs are not using ftp, http[s] or file.
    2018蓝桥杯A组省赛A,B,C,D
    Spark学习进度10-DS&DF基础操作
    SparkSQL学习进度9-SQL实战案例
  • 原文地址:https://www.cnblogs.com/LexMoon/p/concurrenthashmap.html
Copyright © 2011-2022 走看看