1.简述
CyclicBarrier字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。
和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个barrier point,才能继续执行。能够处理更加复杂的场景。并且CyclicBarrier内有一个Generation对象,可以重用下去。
CyclicBarrier的使用场景:
- 可以用于多线程计算数据,最后合并计算结果的应用场景。
2.Cyclicbarrier的常用方法
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/**构造方法 */ //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier时执行预定义的操作。 CyclicBarrier(int parties) //创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier时执行给定的屏障操作,该操作由最后一个进入 barrier的线程执行。 CyclicBarrier(int parties, Runnable barrierAction) /**常用方法 */ //在所有参与者都已经在此barrier上调用await方法之前,将一直等待。 int await() //在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。 int await(long timeout, TimeUnit unit) //返回当前在屏障处等待的参与者数目。 int getNumberWaiting() //返回要求启动此 barrier的参与者数目。 int getParties() //查询此屏障是否处于损坏状态。 boolean isBroken() //将屏障重置为其初始状态。 void reset()
3.Cyclicbarrier的源码分析
CyclicBarrier是通过ReentrantLock(独占锁)、Condition来实现的。下面,我们分析CyclicBarrier中的源码。
CyclicBarrier的主要属性:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
//并没有自定义同步器,而是定义了一个Generation类,里面包含一个broker属性。 private static class Generation { boolean broken = false;//只有一个标记值 } //属性 private final ReentrantLock lock = new ReentrantLock(); //可重入锁 private final Condition trip = lock.newCondition(); //Condition后面的await()和singalAll()的调用 private final int parties; //参与人数量 private final Runnable barrierCommand; //触发时要运行的命令 private Generation generation = new Generation(); private int count;// 记录还有多少在等待数
CyclicBarrier的构造函数:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/**需要传入一个parties变量,也就是需要等待的线程数。 */ public CyclicBarrier(int parties) { this(parties, null);//parties表示必须同时到达barrier的线程个数。 } /**也可以传入执行的命令,唤醒时调用 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties;//parties表示必须同时到达barrier的线程个数。 this.count = parties;//count表示处在等待状态的线程个数。 this.barrierCommand = barrierAction;//barrierCommand表示parties个线程到达barrier时,会执行的动作。 }
CyclicBarrier的await方法:
每个需要在栅栏处等待的线程都需要显式地调用await()
方法等待其它线程的到来。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。直到所有线程都到达屏障点,当前线程才会被唤醒 */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /**该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态。在timeout指定的超时时间内,等待其他参与线程到达屏障点。 * 如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待. */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
CyclicBarrier的核心方法dowait:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //获取独占锁(lock) lock.lock(); try { //保存当前的generation final Generation g = generation; //若当前generation已损坏,则抛出异常。 if (g.broken) throw new BrokenBarrierException(); //如果当前线程被中断,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //将count计数器-1 int index = --count; //如果index=0,则意味着有parties个线程到达barrier。 if (index == 0) { // tripped boolean ranAction = false; try { //如果barrierCommand不为null,则执行该动作。 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒所有等待线程,并更新generation。 nextGeneration(); //这里等价于return index; return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //当前线程一直阻塞,直到有parties个线程到达barrier或当前线程被中断或超时这3者之一发生,当前线程才继续执行。 for (;;) { try { //如果不是超时等待,则调用awati()方法进行等待。否则,调用awaitNanos()方法进行等待。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果等待过程中,线程被中断,则执行下面的函数。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } //如果当前generation已经损坏,则抛出异常。 if (g.broken) throw new BrokenBarrierException(); //如果generation已经换代,则返回index。 if (g != generation) return index; //如果是超时等待,并且时间已到,则通过breakBarrier()方法终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放独占锁(lock) lock.unlock(); } }
dowait方法的整个逻辑分成两个部分:
- 最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。
- 非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换代了,这时候返回。
4.Cyclicbarrier的使用示例
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
public class Test { public static void main(String[] args) throws Exception { final CyclicBarrier cb = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { public void run() { try { System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务"); // 以睡眠来模拟几个线程执行一个任务的时间 Thread.sleep(2000); System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕"); // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务; cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("所有线程写入完毕"); } }).start(); } } }
5.总结
CyclicBarrier总结:
- CyclicBarrier会使一组线程阻塞在await()处,当最后一个线程到达时唤醒(只是从条件队列转移到AQS队列中)前面的线程大家再继续往下走。
- CyclicBarrier不是直接使用AQS实现的一个同步器。
- CyclicBarrier基于ReentrantLock及其Condition实现整个同步逻辑。
CyclicBarrier与CountDownLatch的异同:
- 两者都能实现阻塞一组线程等待被唤醒。
- CyclicBarrier是最后一个线程到达时自动唤醒。
- CountDownLatch是通过显式地调用countDown()实现的。
- CyclicBarrier是通过重入锁及其条件锁实现的,CountDownLatch是直接基于AQS实现的。
- CyclicBarrier具有“代”的概念,可以重复使用,CountDownLatch只能使用一次。
- CyclicBarrier只能实现多个线程到达栅栏处一起运行。
- CountDownLatch不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立。