同步并发工具类
同步屏障CyclicBarrier
简介:CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 /** 8 * 表示大家彼此等待,集合后才开始出发(张家界旅游,早上等车,大峡谷进去集合(范乾),大峡谷出来集合) 9 * 工作中没准就有这种应用,当以后的工作中做什么项目,可能用到这个技术,就要联想起来 10 * 理论上是一定有这种需求,jdk开发者才会搞这种技术,只是目前还没碰到。 11 */ 12 public class CyclicBarrierTest { 13 14 public static void main(String[] args) { 15 ExecutorService executor = Executors.newCachedThreadPool(); 16 final CyclicBarrier cb = new CyclicBarrier(15); 17 for (int i = 0; i <15; i++) { 18 Runnable runnable = new Runnable() { 19 @Override 20 public void run() { 21 try { 22 // 获取一个信号灯 23 Thread.sleep((long) (Math.random() * 10000)); 24 System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址1,当前已有" 25 + (cb.getNumberWaiting() + 1) + "个已经到达。" 26 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候")); 27 cb.await(); // 在这里集合 28 29 Thread.sleep((long) (Math.random() * 10000)); 30 System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址2,当前已有" 31 + (cb.getNumberWaiting() + 1) + "个已经到达。" 32 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候")); 33 cb.await(); // 在这里集合 34 35 Thread.sleep((long) (Math.random() * 10000)); 36 System.out.println("线程: " + Thread.currentThread().getName() + "即将到达集合地址3,当前已有" 37 + (cb.getNumberWaiting() + 1) + "个已经到达。" 38 + (cb.getNumberWaiting() == 14 ? "都到齐了,继续往下走" : "正在等候")); 39 cb.await(); // 在这里集合 40 } catch (Exception e) { 41 e.printStackTrace(); 42 } 43 } 44 }; 45 executor.execute(runnable); 46 } 47 executor.shutdown(); 48 } 49 }
如果把new CyclicBarrier(15)修改成new CyclicBarrier(16)则主线程和子线程会永远等待,因为没有第16个线程执行await方法,即没有第16个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:
1 public class CyclicBarrierTest2 { 2 3 static CyclicBarrier c = new CyclicBarrier(2, new A()); 4 5 public static void main(String[] args) { 6 new Thread(new Runnable() { 7 8 @Override 9 public void run() { 10 try { 11 c.await(); 12 } catch (Exception e) { 13 14 } 15 System.out.println(1); 16 } 17 }).start(); 18 19 try { 20 c.await(); 21 } catch (Exception e) { 22 23 } 24 System.out.println(2); 25 } 26 27 static class A implements Runnable { 28 29 @Override 30 public void run() { 31 System.out.println(3); 32 } 33 34 } 35 36 }
输出:
1 3 2 1 3 2
CyclicBarrier的应用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
同步信号量Semaphore
控制并发线程数
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 /** 8 * 信号灯同步工具 9 * 与互斥锁对比,信号灯可以由其它线程释放,互斥锁只能由自己释放 10 * 模拟场景:现在有3个厕所坑,有5个人要进去,先进去3个人,其它2个等待,有人出来,等待的人再进去 11 * <p> 12 * TTP: 13 * 知识很容易就学会,最大的困难是在以后的工作中遇到困难了,正好能想到用这个工具 14 * 每个药能治什么病,很容易学会,难就难在当看到一个病人以后,能想起来就这个药,能想到这一步你就是华佗。 15 */ 16 public class SemaphoreTest { 17 public static void main(String[] args) { 18 ExecutorService executor = Executors.newFixedThreadPool(5); 19 20 //可使用的公共资源数量 21 final Semaphore sp = new Semaphore(3); 22 23 for (int i = 0; i < 5; i++) { 24 Runnable runnable = new Runnable() { 25 @Override 26 public void run() { 27 try { 28 // 获取一个信号灯 29 sp.acquire(); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 System.out.println("线程: " + Thread.currentThread().getName() + "进入,当前已有" 34 + (3 - sp.availablePermits()) + "个并发。"); 35 try { 36 Thread.sleep((long) (Math.random() * 10000)); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 System.out.println("线程" + Thread.currentThread().getName() + "即将离开。"); 41 sp.release(); 42 System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" 43 + (3 - sp.availablePermits()) + "个并发"); 44 } 45 }; 46 executor.execute(runnable); 47 } 48 executor.shutdown(); 49 } 50 }
CountDownLatch
等待多线程完成,CountDownLatch 允许一个或多个线程等待其他线程完成操作。
应用场景
假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join。代码如下:
1 public class JoinCountDownLatchTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 Thread parser1 = new Thread(new Runnable() { 5 @Override 6 public void run() { 7 } 8 }); 9 10 Thread parser2 = new Thread(new Runnable() { 11 @Override 12 public void run() { 13 System.out.println("parser2 finish"); 14 } 15 }); 16 17 parser1.start(); 18 parser2.start(); 19 parser1.join(); 20 parser2.join(); 21 System.out.println("all parser finish"); 22 } 23 24 }
1 package com.cattsoft.sync; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 /** 8 * 倒计时计数器 9 * 模拟场景: 10 * 三个运动员跑步比赛,听到一听哨响,开始跑,全部跑到了,一起公布成绩。 11 */ 12 public class CoutDownLatchTest { 13 public static void main(String[] args) { 14 ExecutorService executor = Executors.newCachedThreadPool(); 15 final CountDownLatch cdOrder = new CountDownLatch(1); 16 final CountDownLatch cdAnswer = new CountDownLatch(3); 17 18 for (int i = 0; i < 3; i++) { 19 Runnable runnable = new Runnable() { 20 @Override 21 public void run() { 22 try { 23 System.out.println("线程"+ Thread.currentThread().getName() + "正在准备接受命令"); 24 cdOrder.await(); 25 System.out.println("线程" + Thread.currentThread().getName() + "已经接受命令"); 26 27 Thread.sleep((long) (Math.random() * 10000)); 28 System.out.println("线程"+ Thread.currentThread().getName() + "回应此命令的处理结果"); 29 cdAnswer.countDown(); 30 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }; 36 executor.execute(runnable); 37 } 38 39 try { 40 Thread.sleep((long) (Math.random() * 10000)); 41 System.out.println("线程"+ Thread.currentThread().getName() + "即将发布命令"); 42 cdOrder.countDown(); 43 System.out.println("线程"+ Thread.currentThread().getName() + "已经发送命令,正在等待结果"); 44 cdAnswer.await(); 45 System.out.println("线程"+ Thread.currentThread().getName() + "已收到所有响应结果"); 46 } catch (Exception e) { 47 e.printStackTrace(); 48 } 49 executor.shutdown(); 50 } 51 } 52 53 54 Map<String,Integer> linkMap = new LinkedHashMap<>(); 55 for(Entry<String,Integer> newEntry :result){ 56 linkMap.put(newEntry.getKey(), newEntry.getValue()); 57 } 58 for(Map.Entry<String, Integer> mapEntry : linkMap.entrySet()){ 59 System.out.println("key:"+mapEntry.getKey()+" value:"+mapEntry.getValue()); 60 } 61 }
CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
isBroken的使用代码如下:
1 import java.util.concurrent.BrokenBarrierException; 2 import java.util.concurrent.CyclicBarrier; 3 4 public class CyclicBarrierTest3 { 5 6 static CyclicBarrier c = new CyclicBarrier(2); 7 8 public static void main(String[] args) throws InterruptedException, BrokenBarrierException { 9 Thread thread = new Thread(new Runnable() { 10 11 @Override 12 public void run() { 13 try { 14 c.await(); 15 } catch (Exception e) { 16 } 17 } 18 }); 19 thread.start(); 20 thread.interrupt(); 21 try { 22 c.await(); 23 } catch (Exception e) { 24 System.out.println(c.isBroken()); 25 } 26 } 27 }