5.1 同步容器类
Vector,Hashtable。对所有公有方法都是同步方法,使用synchronized修饰
5.1.1 同步容器类的问题
特殊情况还是需要客户端加锁。比如线程A遍历容器元素,线程B对同一个容器删除元素,会出现并发修改异常。解决办法是在客户端封装一层加锁方法。
public static Integer getValue(Vector<Integer> vector, int i) { synchronized (vector) { return vector.get(i); } } public static void deleteValue(Vector<Integer> vector, int i) { synchronized (vector) { vector.remove(i); } }
5.1.2 迭代器域ConcurrentModificationException
产生的原因,迭代器遍历会有校验,当遍历过程中发现集合元素被修改时,抛出异常
public static void main(String[] args) { List<Integer> list = (List)IntStream.range(1, 11).boxed().collect(Collectors.toList()); Iterator var2 = list.iterator(); while(var2.hasNext()) { Integer i = (Integer)var2.next(); if (i == 10) { list.remove(i); } } }
其中Iterator#next方法底层调用checkForComodification方法验证两个参数是否相等,一个是modCount,一个是expectedModCount。在迭代器初始化时,这两个参数设置成一样的。当集合修改元素时,modCount++,但是迭代器本身并不感知,expectedModCount无变化,导致下次遍历时验证失败抛出异常。解决办法是使用迭代器本身的remove方法,内里删除完后会重置使两者相等。
5.1.3 隐藏迭代器
有些方法隐式调用迭代器遍历,比如打印集合元素,foreach等。
5.2 并发容器JUC
同步容器不适合高并发场景使用,对集合的写入读取都会加锁。Java提供了并发容器改善性能。
5.2.1 ConcurrentHashMap
见数据结构的ConcurrentHashMap章节
5.2.3 CopyOnWriteArrayList
替代同步List,写时复制。每次修改都会加锁,复制原列表,在快照上修改,修改完成后替换引用并释放锁。读取是不用加锁的,读取的是历史版本。
5.3 阻塞队列和生产者-消费者模式
阻塞队列提供了可阻塞的put和take方法,支持定时的offer和poll方法。队列可以是有界和无界的,有界队列,当队列满时,put方法会阻塞;当队列空时,take方法会阻塞。
BlockingQueue有多种实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue支持按自定义优先级排序。
5.3.3 双端队列和工作密取
双端队列可以从头部和尾部插入和删除。工作密取:每个消费者对应一个各自的队列,从头部开始消费,当有新任务时插入到尾部,当自身的队列消费完毕后,会选择其他队列从尾部元素开始消费。
5.4 阻塞方法和中断
阻塞方法:在方法的执行过程中,线程会被阻塞进入等待。每个线程有一个中断标识,想要中断某个线程正在进行的操作可以设置这个标识。这时一种协作机制,是否真正中断是由目标线程自己选择的。当某个方法抛出InterruptedException时,这个方法的调用者必须要处理中断。有两种基本选择:
1.传递异常:重新抛出InterruptedException异常,交由更上层的调用者解决
2.恢复中断:有时候不能抛出异常,这个时候需要重新设置线程的中断状态,让上层知道。
见:https://www.cnblogs.com/walker993/p/14325458.html
5.5 同步工具类
根据自身的状态协调线程的运行逻辑。比如阻塞队列、信号量、栅栏、闭锁。
5.5.1 闭锁
相当于一扇门,到达结束状态前,门是锁着的,线程无法通过,到达结束状态后,门打开并且不会再关闭。
import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { private static CountDownLatch latch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(() -> { System.out.println("线程" + Thread.currentThread().getId() + "初始化线程池完毕"); latch.countDown(); }).start(); new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("线程" + Thread.currentThread().getId() + "初始化数据库连接池完毕"); latch.countDown(); }).start(); latch.await(); System.out.println("启动完成"); } }
5.5.2 FutureTask
FutureTask是RunnableFuture的实现,RunnableFuture接口继承了Runnable和Future
1. Runnable,定义了run方法无返回值,线程执行的任务
2.Future,定义了一些跟踪任务执行的状态,阻塞获取执行结果的方法
所以说FutureTask可以看成是一个可以阻塞获得结果的任务。可能会问Runnable不是没有返回值吗?那获取什么返回值呢?
FutureTask有两个构造方法:
public FutureTask(Runnable runnable, V result)
和 public FutureTask(Callable<V> callable)
其中Callable定义了call方法有返回值,FutureTask可以阻塞获得执行的结果。通过Runnable定义的FutureTask可以指定执行完成的返回结果。
5.5.3 信号量
用来控制一个资源的访问量,获得了许可的线程才能访问。
1.跟锁不同的时,一个线程获取了访问许可,可以由另一个线程中释放,也就是许可跟线程没有绑定关系。
2.不支持重入
public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { System.out.println("---演示线程无关性---"); ConnectionPool pool = new ConnectionPool(Lists.newArrayList(1)); new Thread(pool::borrow).start(); new Thread(pool::borrow).start(); Thread.sleep(1000); new Thread(() -> pool.release(1)).start(); Thread.sleep(1000); System.out.println("---演示不支持重入---"); ConnectionPool pool2 = new ConnectionPool(Lists.newArrayList(1)); new Thread(() -> { pool2.borrow(); pool2.borrow(); }).start(); } static class ConnectionPool { List<Object> pool; Semaphore semaphore; ConnectionPool(List<Object> conn) { this.pool = conn; semaphore = new Semaphore(conn.size()); } private void borrow() { System.out.println("线程" + Thread.currentThread().getId() + "尝试获取了连接"); try { semaphore.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } pool.remove(pool.size() - 1); System.out.println("线程" + Thread.currentThread().getId() + "获取了连接"); } private void release(Object conn) { pool.add(conn); semaphore.release(); System.out.println("线程" + Thread.currentThread().getId() + "释放了连接"); } } }
实验结果:
5.5.4 栅栏
类似于闭锁,但是可以重置。
public class CyclicBarrierTest { public static void main(String[] args) { testWait(); } /** * 多线程任务等待测试 * 场景: * 两阶段任务,3个线程一起执行,每阶段都需要全部执行完毕,才能开始下阶段 */ public static void testWait() { int allWorker = 3; CyclicBarrier cyclicBarrier = new CyclicBarrier(allWorker); new Thread(() -> { try { //阶段1 Thread.sleep(100); int await = cyclicBarrier.await();//还需要等待的线程数 System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作"); //尝试重置栅栏 tryReset(await, cyclicBarrier); //阶段2 Thread.sleep(1000); await = cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作"); } catch (Exception ignore) { } }, "线程a").start(); new Thread(() -> { try { //阶段1 Thread.sleep(400); int await = cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作"); //尝试重置栅栏 tryReset(await, cyclicBarrier); //阶段2 Thread.sleep(600); await = cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作"); } catch (Exception ignore) { } }, "线程b").start(); new Thread(() -> { try { //阶段1 Thread.sleep(300); int await = cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段1工作"); //尝试重置栅栏 tryReset(await, cyclicBarrier); //阶段2 Thread.sleep(100); await = cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "第" + (allWorker - await) + "个完成阶段2工作"); } catch (Exception ignore) { } }, "线程c").start(); } /** * 当最后一个完成的会还原栅栏 * * @param await 剩余需要等待的线程数 * @param cyclicBarrier 栅栏 */ private static void tryReset(int await, CyclicBarrier cyclicBarrier) { if (await == 0) { cyclicBarrier.reset(); } } }
5.5.5 Exchanger
线程间交换数据
public class ExchangerTest { public static void main(String[] args) { testDataExchange(); } /** * 多线程数据交换测试 * 场景:两个线程交换本地数据 */ public static void testDataExchange() { Exchanger<Goods> exchanger = new Exchanger<>(); new Thread(() -> { Goods goods = new Goods("线程1-商品1"); System.out.println("线程1生产了" + goods); try { goods = exchanger.exchange(goods); } catch (InterruptedException e) { // } System.out.println("线程1交换完毕,目前商品:" + goods); }).start(); new Thread(() -> { Goods goods = new Goods("线程2-商品2"); System.out.println("线程2生产了" + goods); try { goods = exchanger.exchange(goods); } catch (InterruptedException e) { // } System.out.println("线程2交换完毕,目前商品:" + goods); }).start(); } @AllArgsConstructor static class Goods { private String name; @Override public String toString() { return name; } } }