Java 多线程进阶-并发协作控制
-
线程协作对比
- Thread/Executor/Fork-Join
- 线程启动, 运行, 结束.
- 线程之间缺少协作.
- synchronized 同步
- 互斥, 限定只有一个线程才能进入关键区.
- 简单粗暴, 性能损失有点大>_<.
- Thread/Executor/Fork-Join
-
Lock 锁
- Lock 也可以实现同步的效果
- 实现更复杂的临界区结构.
- tryLock 方法可以预判锁是否空闲.
- 允许分离读写的操作, 多读单写.
- 性能更好.
- ReentrantLock 类, 可重入的互斥锁.
- RenntrantReadWriteLock 类, 可重入的读写锁.
- tryLock()/lock()/unlock() 函数.
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class LockExample { private static final ReentrantLock queueLock111 = new ReentrantLock(); // 可重入锁 private static final ReentrantReadWriteLock orderLock111 = new ReentrantReadWriteLock(); // 可重入读写锁 /** * 学校门口有家奶茶店, 学生们点单有时需要排队 * 1. 买奶茶 * 假设想买奶茶的同学如果看到需要排队, 就决定不买了 * (一次只有一个买) * <p> * 2. 操作奶茶账本 * 假设奶茶店有老板和多名员工, 记录方式比较原始, 只有一个订单本 * (多个读, 一个写) * 老板负责写新订单, 员工不断查看订单本得到信息来制作奶茶, 在老板写新订单的时候员工不能查看订单本 * (写时, 不能读) * 多个员工可以同时查看订单本, 此时老板不能写新订单 * (读时, 不能写) * * @param args 1 */ public static void main(String[] args) throws InterruptedException { // 1. 买奶茶的例子 buyMilkTea(); // 使用可重入锁 // 2. 操作奶茶账本的例子 handleOrder(); // 使用读写锁 } public static void buyMilkTea() throws InterruptedException { LockExample lockExample = new LockExample(); int STUDENTS_COUNT = 10; Thread[] students = new Thread[STUDENTS_COUNT]; for (int i = 0; i < students.length; i++) { students[i] = new Thread(new Runnable() { // 匿名的线程类, 没有名字的 @Override public void run() { try { long walkingTime = (long) (Math.random() * 1000); Thread.sleep(walkingTime); lockExample.tryToBuyMilk(); } catch (InterruptedException e) { e.printStackTrace(); } } }); students[i].start(); } for (Thread student : students) { student.join(); } } private void tryToBuyMilk() throws InterruptedException { boolean flag = true; while (flag) { if (queueLock111.tryLock()) { // 查一下现在是否锁住, 锁住了在下面的flag地方等一下再来操作. // tryLock()实际包含了两个操作, 先 try 再 lock; 如果没有锁再锁住. long thinkingTime = (long) (Math.random() * 500); Thread.sleep(thinkingTime); System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 来一杯珍珠奶茶, 不要珍珠"); flag = false; queueLock111.unlock(); } else { System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 再等等"); } if (flag) { Thread.sleep(1000); } } } /** * 处理订单 */ static void handleOrder() { LockExample lockExample = new LockExample(); Thread boss = new Thread(() -> { while (true) { try { lockExample.addOrder(); // 老板加新单子 long waitingTime = (long) (Math.random() * 1000); Thread.sleep(waitingTime); } catch (InterruptedException e) { e.printStackTrace(); } } }); boss.start(); int workerCount = 3; Thread[] workers = new Thread[workerCount]; for (int i = 0; i < workerCount; i++) { workers[i] = new Thread(new Runnable() { @Override public void run() { while (true) { try { lockExample.viewOrder(); // 员工取出单子 long workingTime = (long) (Math.random() * 5000); Thread.sleep(workingTime); } catch (InterruptedException e) { e.printStackTrace(); } } } }); workers[i].start(); } } /** * 向订单本录入新订单 */ private void addOrder() throws InterruptedException { orderLock111.writeLock().lock(); // writeLock 写锁, 排他的, 只能一个线程拥有 long writingTime = (long) (Math.random() * 1000); Thread.sleep(writingTime); System.out.println(LocalDateTime.now() + " => " + "老板新加一个订单"); orderLock111.writeLock().unlock(); } /** * 查看订单本 */ private void viewOrder() throws InterruptedException { orderLock111.readLock().lock(); // readLock 读锁, 可以多个线程共享(同时访问) long readingTime = (long) (Math.random() * 500); Thread.sleep(readingTime); System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 查看了订单本"); orderLock111.readLock().unlock(); } }
- Lock 也可以实现同步的效果
-
Semaphore 信号量
- 由1965年Dijkstra提出的.
- 信号量: 本质上是一个计数器.
- 计数器大于0, 可以使用, 等于0不能使用.
- 可以设置多个并发量, 例如限制10个访问.
- Semaphore
- acquire 获取.
- release 释放.
- 比 Lock 更进一步, 可以控制多个同时访问关键区.
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * 控制同时访问代码块的线程数 * 现在有一个地下车库, 共有5个车位, 有10辆车需要停放, 每次停放时, 去申请信号量 */ public class SemaphoreExample { private final Semaphore placeSemaphore = new Semaphore(5); public static void main(String[] args) throws InterruptedException { SemaphoreExample example = new SemaphoreExample(); int tryToParkCount = 10; Thread[] parkers = new Thread[tryToParkCount]; for (int i = 0; i < parkers.length; i++) { parkers[i] = new Thread(new Runnable() { @Override public void run() { try { long randomTime = (long) (Math.random() * 1000); Thread.sleep(randomTime); // 过一段时间来停车 if (example.parking()) { long parkingTime = (long) (Math.random() * 1200); Thread.sleep(parkingTime); // 停一段时间离开 example.leaving(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); parkers[i].start(); } for (Thread t : parkers) { t.join(); } TimeUnit.SECONDS.sleep(60); } private boolean parking() { if (placeSemaphore.tryAcquire()) { // 查看是否有剩余的信号量(剩余的车位) System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 停车成功!"); return true; } else { System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 没有空位"); return false; } } private void leaving() { placeSemaphore.release(); System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 开走了"); } }
-
Latch 等待锁
- 是一个同步辅助类.
- 用来同步执行任务的一个或多个线程
- 不是用来保护临界区或共享资源
- CountDownLatch
- countDown() 计数减一.
- await() 等待 latch 变成 0.
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.concurrent.CountDownLatch; /** * 设想百米赛跑, 发令枪发出信号后选手开始跑, 全部选手跑到终点后比赛结束. */ public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(10); int runnerCount = 10; // 选手数量 for (int i = 0; i < runnerCount; i++) { // create and start threads new Thread(new Runner(startSignal, doneSignal)).start(); // 所有选手开始跑~ } System.out.println(LocalDateTime.now() + " => " + "准备就绪.."); startSignal.countDown(); // let all threads proceed System.out.println(LocalDateTime.now() + " => " + "比赛开始!"); doneSignal.await(); // wait for all threads to finish System.out.println(LocalDateTime.now() + " => " + "比赛结束!"); } } class Runner implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Runner(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } @Override public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } private void doWork() throws InterruptedException { long time = (long) (Math.random() * 10 * 1000); Thread.sleep(time); // 随机在十秒内跑完 System.out.printf(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 跑完全程, 用时 %d 秒 ", time/1000); } }
-
Barrier/ˈbæriər/ n.障碍物
- 集合点, 也是一个同步辅助类
- 允许多个线程在某一个点上进行同步
- CyclicBarrier
- 构造函数是需要同步的线程数量.
- await 等待其他线程, 到达数量后就放行.
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * Barrier/ˈbæriər/ n.障碍物 * 假定有三行数字, 用三个线程分别计算每一行的和, 最后计算综合 */ public class BarrierExample { public static void main(String[] args) { int rowCount = 3; int colCount = 5; final int[][] numbers = new int[rowCount][colCount]; final int[] results = new int[rowCount]; numbers[0] = new int[]{1, 2, 3, 4, 5}; numbers[1] = new int[]{6, 7, 8, 9, 10}; numbers[2] = new int[]{11, 12, 13, 14, 15}; CalcFinalSum111 finalResult = new CalcFinalSum111(results); CyclicBarrier cyclicBarrier = new CyclicBarrier(rowCount, finalResult); // 当有3个线程在 barrier上await时, 就执行最终计算 for (int i = 0; i < rowCount; i++) { CalcRowSum111 eachRow = new CalcRowSum111(numbers, i, results, cyclicBarrier); new Thread(eachRow).start(); } } } class CalcRowSum111 implements Runnable { final int[][] numbers; final int rowNumber; final int[] result; final CyclicBarrier barrier; CalcRowSum111(int[][] numbers, int rowNumber, int[] result, CyclicBarrier barrier) { this.numbers = numbers; this.rowNumber = rowNumber; this.result = result; this.barrier = barrier; } @Override public void run() { int[] row = numbers[rowNumber]; int sum = 0; for (int data : row) { sum += data; result[rowNumber] = sum; } try { System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 计算第" + (rowNumber + 1) + "行结束, 结果为: " + sum); barrier.await(); // 等待! 只要超过(Barrier的构造参数填入的数量)的个数, 就放行 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } class CalcFinalSum111 implements Runnable { final int[] eachRowResult; int finalResult; CalcFinalSum111(int[] eachRowResult) { this.eachRowResult = eachRowResult; } @Override public void run() { int sum = 0; for (int data : eachRowResult) { sum += data; } finalResult = sum; System.out.println(LocalDateTime.now() + " => " + "最终结果为: " + finalResult); } }
-
Phaser 阶段性控制多个线程
- 允许执行并发多阶段任务, 同步辅助类.
- 在每一个阶段结束的位置对线程进行同步, 当所有的线程都到达这一步, 再进行下一步.
- Phaser
- arrive()
- arriveAndAwaitAdvance()
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.concurrent.Phaser; /** * 假设举行考试, 总共三道大题, 每次下发一道题目, 等所有学生都完成之后再进行下一道题 */ public class PhaserExample { public static void main(String[] args) { int studentCount = 5; Phaser phaser = new Phaser(studentCount); for (int i = 0; i < studentCount; i++) { new Thread(null, new Student111(phaser), "学生" + i).start(); } } } class Student111 implements Runnable { private final Phaser phaser; Student111(Phaser phaser) { this.phaser = phaser; } @Override public void run() { try { doTesting(1); phaser.arriveAndAwaitAdvance(); // 等到所有线程都到达了, 才放行 doTesting(2); phaser.arriveAndAwaitAdvance(); doTesting(3); phaser.arriveAndAwaitAdvance(); } catch (InterruptedException e) { e.printStackTrace(); } } private void doTesting(int i) throws InterruptedException { String name = Thread.currentThread().getName(); System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "开始答第" + i + "题"); long thinkingTime = (long) (Math.random() * 1000); Thread.sleep(thinkingTime); // 模拟学生答题时间 System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "第" + i + "道题答题结束"); } }
-
Exchanger 两个线程间交换数据
- 允许在并发线程中互相交换消息.
- 允许在2个线程中定义同步点, 当两个线程都到达同步点, 他们交换数据结构
- Exchanger
- exchange(), 线程双方互相交互数据
- 交换数据是双向的
package concurrentDemo0421; import java.time.LocalDateTime; import java.util.Scanner; import java.util.concurrent.Exchanger; /** * 通过Exchanger实现学生成绩查询, 两个线程间简单的数据交换, * 把自己线程的内容输出给另一个线程(只能简单的双向传送, 不能向MPI一样随意点对点的传输, 线程1给线程3 线程3向线程2...这样) */ public class ExchangerExample { public static void main(String[] args) throws InterruptedException { Exchanger<String> exchanger = new Exchanger<>(); BackgroundWorker111 backgroundWorker111 = new BackgroundWorker111(exchanger); new Thread(backgroundWorker111).start(); Scanner scanner = new Scanner(System.in); while (true) { System.out.println(LocalDateTime.now() + " => " + "请输入要查询的学生名字:"); String input = scanner.nextLine().trim(); exchanger.exchange(input); String exResult = exchanger.exchange(null); // 拿到线程反馈的结果 // 当两个线程都同时执行到同一个exchanger.exchange()方法, 两个线程就互相交换数据, 交换是双向的. if ("exit".equals(exResult)) { System.out.println(LocalDateTime.now() + " => " + "退出查询~"); break; } System.out.println(LocalDateTime.now() + " => " + "查询结果: " + exResult); } } } class BackgroundWorker111 implements Runnable { final Exchanger<String> exchanger; BackgroundWorker111(Exchanger<String> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (true) { try { String item = exchanger.exchange(null); switch (item) { case "zhangsan": exchanger.exchange("90"); break; case "lisi": exchanger.exchange("80"); break; case "wangwu": exchanger.exchange("70"); break; case "exit": exchanger.exchange("exit"); return; // 退出run, 即结束当前线程 default: exchanger.exchange("no body!"); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
总结
- java.util.concurrent 包提供了很多并发编程的控制协作类.
- 根据业务特点, 使用正确的线程并发控制协作.