同步容器
同步容器可以简单地理解为通过synchronized来实现同步的容器,如果有多个线程调用同步容器的方法,它们将会串行执行。
同步容器将它们的状态封装起来,并对每一个公有方法进行同步。主要包括:
- Vector
- Stack
- HashTable
- Collections.synchronized方法生成,例如:
- Collectinons.synchronizedList()
- Collections.synchronizedSet()
- Collections.synchronizedMap()
- Collections.synchronizedSortedSet()
- Collections.synchronizedSortedMap()
其中Vector(同步的ArrayList)和Stack(继承自Vector,先进后出)、HashTable(继承自Dictionary,实现了Map接口)是比较老的容器,Thinking in Java中明确指出,这些容器现在仍然存在于JDK中是为了向以前老版本的程序兼容,在新的程序中不应该在使用。Collections的方法时将非同步的容器包裹生成对应的同步容器。
同步容器在单线程的环境下能够保证线程安全,但是通过synchronized同步方法将访问操作串行化,导致并发环境下效率低下。而且同步容器在多线程环境下的复合操作(迭代、条件运算如没有则添加等)是非线程安全,需要客户端代码来实现加锁。
1.同一接口,不同实现的线程安全类
vector的所有方法都是有synchronized关键字保护的,stack继承了vector,并且提供了栈操作(先进后出),hashtable也是由synchronized关键字保护
package com.mmall.concurrency.example.syncContainer;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@ThreadSafe
public class VectorExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new Vector<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
注意:1.同步容器并不一定线程安全
/**
* 并发测试
* 同步容器不一定线程安全
* @author gaowenfeng
*/
@Slf4j
@NotThreadSafe
public class VectorExample2 {
/** 请求总数 */
public static int clientTotal = 5000;
/** 同时并发执行的线程数 */
public static int threadTotal = 50;
public static List<Integer> list = new Vector<>();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
list.add(i);
}
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.remove(i);
}
});
Thread thread2 = new Thread(() -> {
// thread2想获取i=9的元素的时候,thread1将i=9的元素移除了,导致数组越界
for (int i = 0; i < 10; i++) {
list.get(i);
}
});
thread1.start();
thread2.start();
}
}
2. Collections.synchronizedXXX (list,set,map)
package com.mmall.concurrency.example.syncContainer;
import com.google.common.collect.Lists;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
@ThreadSafe
public class CollectionsExample1 {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
注意:2.在foreach或迭代器遍历的过程中不要做删除操作,应该先标记,然后最后再统一删除
public class VectorExample3 {
// java.util.ConcurrentModificationException
// 在遍历的同时进行了删除的操作,导致抛出了并发修改的异常
private static void test1(Vector<Integer> v1) { // foreach
for(Integer i : v1) {
if (i.equals(3)) {
v1.remove(i);
}
}
}
// java.util.ConcurrentModificationException
private static void test2(Vector<Integer> v1) { // iterator
Iterator<Integer> iterator = v1.iterator();
while (iterator.hasNext()) {
Integer i = iterator.next();
if (i.equals(3)) {
v1.remove(i);
}
}
}
// success
private static void test3(Vector<Integer> v1) { // for
for (int i = 0; i < v1.size(); i++) {
if (v1.get(i).equals(3)) {
v1.remove(i);
}
}
}
public static void main(String[] args) {
Vector<Integer> vector = new Vector<>();
vector.add(1);
vector.add(2);
vector.add(3);
test1(vector);
}
}
同步容器性能不是特别好,并且不能保证完全线程安全。我们可以使用并发容器进行取代它。同步容器已经使用的越来越少了,都是使用并发容器替代。
并发容器
Java并发容器JUC是三个单词的缩写。是JDK下面的一个包名。即Java.util.concurrency。
什么是CopyOnWrite容器
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器,它的写是需要加锁的。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
缺点:
1.写操作时复制消耗内存,如果元素比较多时候,容易导致young gc 和full gc。
2.不能用于实时读的场景.由于复制和add操作等需要时间,故读取时可能读到旧值。
能做到最终一致性,但无法满足实时性的要求,更适合读多写少的场景。
如果无法知道数组有多大,或者add,set操作有多少,慎用此类,在大量的复制副本的过程中很容易出错。
设计思想:
1.读写分离
2.最终一致性
3.使用时另外开辟空间,防止并发冲突
源码分析:
//构造方法
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;//使用对象数组来承载数据
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}
//添加数据方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;//使用重入锁,保证线程安全
lock.lock();
try {
Object[] elements = getArray();//获取当前数组数据
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//复制当前数组并且扩容+1
newElements[len] = e;//将要添加的数据放入新数组
setArray(newElements);//将原来的数组指向新的数组
return true;
} finally {
lock.unlock();
}
}
//获取数据方法,与普通的get没什么差别
private E get(Object[] a, int index) {
return (E) a[index];
}
HashSet –> CopyOnWriteArraySet
- 它是线程安全的,底层实现使用的是CopyOnWriteArrayList,因此它也适用于大小很小的set集合,只读操作远大于可变操作。因为他需要copy整个数组,所以包括add、remove、set它的开销相对于大一些。
- 迭代器不支持可变的remove操作。使用迭代器遍历的时候速度很快,而且不会与其他线程发生冲突。
源码分析:
//构造方法
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();//底层使用CopyOnWriteArrayList
}
//添加元素方法,基本实现原理与CopyOnWriteArrayList相同
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {//添加了元素去重操作
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
TreeSet –> ConcurrentSkipListSet
它是JDK6新增的类,同TreeSet一样支持自然排序,并且可以在构造的时候自己定义比较器。
- 同其他set集合,是基于map集合的(基于ConcurrentSkipListMap),在多线程环境下,里面的contains、add、remove操作都是线程安全的。
- 多个线程可以安全的并发的执行插入、移除、和访问操作。但是对于批量操作addAll、removeAll、retainAll和containsAll并不能保证以原子方式执行,原因是addAll、removeAll、retainAll底层调用的还是contains、add、remove方法,只能保证每一次的执行是原子性的,代表在单一执行操纵时不会被打断,但是不能保证每一次批量操作都不会被打断。在使用批量操作时,还是需要手动加上同步操作的。
- 不允许使用null元素的,它无法可靠的将参数及返回值与不存在的元素区分开来。
源码分析:
//构造方法
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();//使用ConcurrentSkipListMap实现
}
HashMap –> ConcurrentHashMap
- 不允许空值,在实际的应用中除了少数的插入操作和删除操作外,绝大多数我们使用map都是读取操作。而且读操作大多数都是成功的。基于这个前提,它针对读操作做了大量的优化。因此这个类在高并发环境下有特别好的表现。
- ConcurrentHashMap作为Concurrent一族,其有着高效地并发操作,相比Hashtable的笨重,ConcurrentHashMap则更胜一筹了。
- 在1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是1.8已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,当然底层采用数组+链表+红黑树的存储结构。
- HashMap与ConcurrentHashMap、HashTable
TreeMap –> ConcurrentSkipListMap
底层实现采用SkipList跳表
曾经有人用ConcurrentHashMap与ConcurrentSkipListMap做性能测试,在4个线程1.6W的数据条件下,前者的数据存取速度是后者的4倍左右。但是后者有几个前者不能比拟的优点:
1、Key是有序的
2、支持更高的并发,存储时间与线程数无关
总结
安全共享对象策略
※线程限制:一个被线程限制的对象,由线程独占,并且只能被占有他的线程修改。
※共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改他
※线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它
※被守护对象:被守护对象只能通过获取特定的锁来访问。
j.u.c的类图