一、CyclicBarrier使用场景分析
1)CyclicBarrier :翻译过来为循环屏障,即这个东西可以循环用
2)就如赛跑时候的起跑线,发令枪打响则大家一起跑,而且这个是可以循环使用的发令枪
3)如果有N个线程想要同时一起并发执行,这个时候可以使用CyclicBarrier来处理
二、CyclicBarrier使用案例
package com.test.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class CyclicBarrierTest4 { static Logger logger = LoggerFactory.getLogger(CyclicBarrierTest4.class ) ; public static int getRandom(){ Random random = new Random() ; int rid = random.nextInt(6); int val = rid+2; logger.info(String.format("生成随机数:%d", val )); return val; } public static void main(String[] args) throws InterruptedException { AtomicInteger huizongNum = new AtomicInteger(); CyclicBarrier cyclicBarrier = new CyclicBarrier(10); for (int i = 0; i <= 10 ; i++) { int finalI = i; Thread t1 = new Thread(()->{ try { //每个线程睡眠时间随机 TimeUnit.SECONDS.sleep(getRandom()); huizongNum.incrementAndGet(); logger.info(String.format("t%d准备完成", finalI)); //所有线程都在这个地方等待其他线程准备完成,一旦全都准备完成则会所有线程一起唤醒,继续执行 cyclicBarrier.await(); logger.info(String.format("t%d执行完成", finalI)); } catch ( Exception e) { e.printStackTrace(); } },"t1") ; t1.start(); } } }
三、源码整体思路:
1)构造函数中设置初始标识位
2)每次调用await,则使用lock中的condition的队列去等待,标识位-1
3 )当标识位变为0的时候唤醒condition中所有线程
1) 初始化 CyclicBarrier对象
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // this.parties = parties; //初始化等待线程的数量 this.count = parties; this.barrierCommand = barrierAction; }
2)线程 await方法
public int await() throws InterruptedException, BrokenBarrierException { try { //调用等待方法 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // timed 是否需要超时时间 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 如果标识位 count变为0了,则进入到这个方法,执行一个统一的方法(如果设置了的话)来做汇总 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //index==0,标识此次并发执行结束,则需要重置所有标识,下次可以继续使用 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed)// 如果没有设置超时时间 //进入到 Condition队列等待 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { //响应中断,唤醒所有等待的线程, breakBarrier(); //抛出中断异常 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; //如果需要超时时间,但是数值为 《0 则超时异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
3)响应中断方法
// 被中断以后则会调用这个方法设置中断标识 private void breakBarrier() { //设置标识位为 true (默认为false) generation.broken = true;//等待的时候会抛出异常,即如果变为true,则下次使用的时候会抛异常,后面不能在用了 //还原等待标识为最初值 count = parties; //唤醒等待的所有线程 trip.signalAll(); }
4 唤醒所有等待线程,并且重置标识位
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }