我们之前了解的集合大多是线程不安全的,比如说ArrayList,HashSet,HashMap,但它们往往高效率;
也有一些线程安全的集合,如Vector,HashTable,但大多都是基于sychronized锁控制机制,性能很低。
如果既保证线程安全,执行效率又高,就可考虑下JUC下的集合类了,如:
ArrayList对应的高并发类是CopyOnWriteArrayList,
HashSet对应的高并发类是 CopyOnWriteArraySet,
HashMap对应的高并发类是ConcurrentHashMap。
它们鱼与熊掌可兼得。但它们为什么在保证效率的同时还能保证线程安全呢?
那么我们先来看看CopyOnWriteArrayList。
CopyOnWriteArrayList提供高效地读取操作,使用在读多写少的场景。CopyOnWriteArrayList读取操作不用加锁,且是安全的;
写操作时,先copy一份原有数据数组,再对复制数据进行写入操作,最后将复制数据替换原有数据,从而保证写操作不影响读操作。
同时注意:其copy的整个过程是上了锁的,所以不会产生多线程安全问题。
我们来看看CopyOnWriteArrayList的核心的源码:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;//独占锁,写读互斥
lock.lock();
try {
Object[] elements = getArray();//创建一个数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//copy原有的数据数组
newElements[len] = e; //对新的数据数组进行修改操作
setArray(newElements); //将修改后的数据替换原有的数据
return true;
} finally {
lock.unlock();
}
}
由于ReentrantLock是java.util.concurrent包下提供的一套互斥锁,相比Synchronized,ReentrantLock类提供了一些高级功能:
1.等待可中断,持有锁的线程长期不释放的时候,正在等待的线程可以选择放弃等待,这相当于Synchronized来说可以避免出现死锁的情况。
2.公平锁,多个线程等待同一个锁时,必须按照申请锁的时间顺序获得锁
3.锁绑定多个条件,一个ReentrantLock对象可以同时绑定多个对象。ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
简单来说,ReenTrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。
而 CopyOnWriteArraySet操作原理与CopyOnWriteArrayList类似,只是Set是无序不可重复,List是有序可重复。这里就不多讲。
接下来来看看ConcurrentHashMap。
ConcurrentHashMap实现了HashTable的所有功能,线程安全,但却在检索元素时不需要锁定,因此效率更高。
之前是分段锁的思想,通过采用分段锁Segment减少热点域来提高并发效率。
1.8之后利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。
我们先来了解下CAS:
CAS:Compare and Swap, 翻译成中文就是比较并交换。
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
这里来看看ConcurrentHashMap的put源码:
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//和hashMap不同的是,concurrentHashMap的key和value都不允许为null
//concurrenthashmap它们是用于多线程的,并发的 ,如果map.get(key)得到了null,
// 不能判断到底是映射的value是null,还是因为没有找到对应的key而为空,
// 而用于单线程状态的hashmap却可以用containKey(key) 去判断到底是否包含了这个null。
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果是第一次put,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据(tab.length - 1) & hash 计算目标节点在数组中的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果为空,则通过cas 添加一个新建一个头节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//hash为-1 说明是一个forwarding nodes节点,表明正在扩容
else if ((fh = f.hash) == MOVED)
//帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//对上面计算出来的节点进行加锁
synchronized (f) {
//这里判断下有没有线程对数组进行了修改
if (tabAt(tab, i) == f) {
//这里如果hash值是大于等于0的说明是链表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果找到了目标节点,那么进行值替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//如果循环到链表结尾还没发现,那么进行插入操作
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果链表数量大于TREEIFY_THRESHOLD(8),开始执行转换树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//进行元素数量统计,和决定是否扩容
addCount(1L, binCount);
return null;
}
ConcurrentHashMap 是设计为非阻塞的。就是一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。
在更新时会局部锁住某部分数据,但不会把整个表都锁住。同步读取操作则是完全非阻塞的。好处是在保证合理的同步前提下,效率很高。