本节介绍一个常用的并发容器 - ConcurrentHashMap,它是HashMap的并发版本,与HashMap相比,它有如下特点:
- 并发安全
- 直接支持一些原子复合操作
- 支持高并发、读操作完全并行、写操作支持一定程度的并行
- 与同步容器Collections.synchronizedMap相比,迭代不用加锁,不会抛出ConcurrentModificationException
- 弱一致性
我们分别来看下。
并发安全
我们知道,HashMap不是并发安全的,在并发更新的情况下,HashMap的链表结构可能形成环,出现死循环,占满CPU,我们看个例子:
public static void unsafeConcurrentUpdate() { final Map<Integer, Integer> map = new HashMap<>(); for (int i = 0; i < 100; i++) { Thread t = new Thread() { Random rnd = new Random(); @Override public void run() { for (int i = 0; i < 100; i++) { map.put(rnd.nextInt(), 1); } } }; t.start(); } }
运行上面的代码,在我的机器上,每次都会出现死循环,占满CPU。
为什么会出现死循环呢?死循环出现在多个线程同时扩容哈希表的时候,不是同时更新一个链表的时候,那种情况可能会出现更新丢失,但不会死循环,具体过程比较复杂,我们就不解释了,感兴趣的读者可以参考这篇文章,http://coolshell.cn/articles/9606.html。
使用Collections.synchronizedMap方法可以生成一个同步容器,避免该问题,替换第一行代码即可:
final Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<Integer, Integer>());
在Java中,HashMap还有一个同步版本Hashtable,它与使用synchronizedMap生成的Map基本是一样的,也是在每个方法调用上加了synchronized,我们就不赘述了。
同步容器有几个问题:
- 每个方法都需要同步,支持的并发度比较低
- 对于迭代和复合操作,需要调用方加锁,使用比较麻烦,且容易忘记
ConcurrentHashMap没有这些问题,它同样实现了Map接口,也是基于哈希表实现的,上面的代码替换第一行即可:
final Map<Integer, Integer> map = new ConcurrentHashMap<>();
原子复合操作
除了Map接口,ConcurrentHashMap还实现了一个接口ConcurrentMap,接口定义了一些条件更新操作,具体定义为:
public interface ConcurrentMap<K, V> extends Map<K, V> { //条件更新,如果Map中没有key,设置key为value,返回原来key对应的值,如果没有,返回null V putIfAbsent(K key, V value); //条件删除,如果Map中有key,且对应的值为value,则删除,如果删除了,返回true,否则false boolean remove(Object key, Object value); //条件替换,如果Map中有key,且对应的值为oldValue,则替换为newValue,如果替换了,返回ture,否则false boolean replace(K key, V oldValue, V newValue); //条件替换,如果Map中有key,则替换值为value,返回原来key对应的值,如果原来没有,返回null V replace(K key, V value); }
如果使用同步容器,调用方必须加锁,而ConcurrentMap将它们实现为了原子操作。实际上,使用ConcurrentMap,调用方也没有办法进行加锁,它没有暴露锁接口,也不使用synchronized。
高并发
ConcurrentHashMap是为高并发设计的,它是怎么做的呢?具体实现比较复杂,我们简要介绍其思路,主要有两点:
- 分段锁
- 读不需要锁
同步容器使用synchronized,所有方法,竞争同一个锁,而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。
采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,不过,这个数字可以通过构造方法进行设置,如下所示:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
concurrencyLevel表示估计的并行更新的线程个数,ConcurrentHashMap会将该数转换为2的整数次幂,比如14转换为16,25转换为32。
在对每个段的数据进行读写时,ConcurrentHashMap也不是简单的使用锁进行同步,内部使用了CAS、对一些写采用原子方式,实现比较复杂,我们就不介绍了,实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远远大于同步容器。
迭代
我们在66节介绍过,使用同步容器,在迭代中需要加锁,否则可能会抛出ConcurrentModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行了修改,迭代会继续,不会抛出异常。
问题是,迭代会反映别的线程的修改?还是像上节介绍的CopyOnWriteArrayList一样,反映的是创建时的副本?答案是,都不是!我们看个例子:
public class ConcurrentHashMapIteratorDemo { public static void test() { final ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("a", "abstract"); map.put("b", "basic"); Thread t1 = new Thread() { @Override public void run() { for (Entry<String, String> entry : map.entrySet()) { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(entry.getKey() + "," + entry.getValue()); } } }; t1.start(); // 确保线程t1启动 try { Thread.sleep(100); } catch (InterruptedException e) { } map.put("c", "call"); } public static void main(String[] args) { test(); } }
t1启动后,创建迭代器,但在迭代输出每个元素前,先睡眠1秒钟,主线程启动t1后,先睡眠一下,确保t1先运行,然后给map增加了一个元素,程序输出为:
a,abstract b,basic c,call
说明,迭代器反映了最新的更新,但我们将添加语句更改为:
map.put("g", "call");
你会发现,程序输出为:
a,abstract b,basic
这说明,迭代器没有反映最新的更新,这是怎么回事呢?我们需要理解ConcurrentHashMap的弱一致性。
弱一致性
ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。
类似的情况还会出现在ConcurrentHashMap的另一个方法:
//批量添加m中的键值对到当前Map public void putAll(Map<? extends K, ? extends V> m)
该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。
小结
本节介绍了ConcurrentHashMap,它是并发版的HashMap,通过分段锁和其他技术实现了高并发,支持原子条件更新操作,不会抛出ConcurrentModificationException,实现了弱一致性。
Java中没有并发版的HashSet,但可以通过Collections.newSetFromMap方法基于ConcurrentHashMap构建一个。
我们知道HashMap/HashSet基于哈希,不能对元素排序,对应的可排序的容器类是TreeMap/TreeSet,并发包中可排序的对应版本不是基于树,而是基于Skip List(跳跃表)的,类分别是ConcurrentSkipListMap和ConcurrentSkipListSet,它们到底是什么呢?