概述
CyclicBarrier,一个同步辅助类,其作用是让一组线程到达公共屏障点时阻塞,直到最后一个线程也到达公共屏障点,屏障才会打开,而且该屏障(barrier)在这一组线程释放后可以重用,所以称为循环(Cyclic)的屏障(Barrier)。看功能和CountDownLatch一样,但是两者的区别如下(jdk1.8):
CountDownLatch | CyclicBarrier |
减计数方式 | 减计数方式 |
计数为0时释放所有等待的线程 | 计数为0时,优先执行一个barrierAction,然后释放所有等待线程 |
计数为0时,无法重置 | 计数为0时,可以重置 |
调用countDown()方法计数减一,调用await()方法只进行阻塞(主线程),对计数没任何影响 | 调用await()方法计数减1,若减1后的值不等于0,则线程阻塞(子线程) |
不可重复利用 | 可重复利用 |
上面列举的区别点不清楚的,先不用管,看完后面这个类的介绍之后,就可以明白了,为了更好的理解,先看一个例子。
使用示例
铁蛋最近手痒,想找人打架,于是他就到了钢蛋家,钢蛋家兄弟5个,他要去揍他兄弟五个,直到他们兄弟五个都被打趴了,他才会肯收手,这时候他也累了,先歇一会,然后再打,再把钢蛋兄弟五个打趴下,下面就模拟一下这个过程。
public class CyclicBarrierTest { //钢蛋 static class Person extends Thread{ CyclicBarrier cyclicBarrier; String name; public Person(CyclicBarrier cyclicBarrier,String name){ this.cyclicBarrier = cyclicBarrier; this.name = name; } @Override public void run() { System.out.println("钢蛋"+name + ":别打了,大哥,我这个月的零发钱都给你"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } static class Sleep extends Thread{ @Override public void run() { System.out.println("打累了,歇会,抽根烟!!!"); } } public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Sleep()); for (int i = 0; i < 5; i++) { new Person(cyclicBarrier,String.valueOf(i)).start(); } try {
//休息1s,再打 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 5; i++) { new Person(cyclicBarrier,String.valueOf(i)).start(); } } }
执行结果
钢蛋0:别打了,大哥,我这个月的零发钱都给你
钢蛋4:别打了,大哥,我这个月的零发钱都给你
钢蛋1:别打了,大哥,我这个月的零发钱都给你
钢蛋2:别打了,大哥,我这个月的零发钱都给你
钢蛋3:别打了,大哥,我这个月的零发钱都给你
打累了,歇会,抽根烟!!!
钢蛋0:别打了,大哥,我这个月的零发钱都给你
钢蛋1:别打了,大哥,我这个月的零发钱都给你
钢蛋2:别打了,大哥,我这个月的零发钱都给你
钢蛋4:别打了,大哥,我这个月的零发钱都给你
钢蛋3:别打了,大哥,我这个月的零发钱都给你
打累了,歇会,抽根烟!!!
从上面的例子可以看出,铁蛋很能打,哈哈哈。大家看这个例子应该就明白CyclicBarrier怎么使用了。
CyclicBarrier工作原理
CyclicBarrier是基于ReentrantLock和Condition实现的,基本过程是这样的,先定义CyclicBarrier拦截的线程数量(parties)和剩余线程数量(count),初始状态parties = count,然后启动多个线程,当某个线程执行完之后,就加锁,然后count减1,如果发现count不等于0,把当前线程await挂起,直到最后一个线程执行完,发现count等于0,这个时候他不着急唤醒其他等待的线程,他先执行一个任务,这个任务是提前就定义好的,当这个任务完成了,他再把其他的线程唤醒,这时这一组线程就都执行完了,之后他会重置拦截的线程数量,这也是为什么CyclicBarrier可以重复使用的原因,以上就是CyclicBarrier处理的基本流程。
CyclicBarrier代码分析
构造方法
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
-
parties
变量,表示拦截线程的总数量。 -
count
变量,表示拦截线程的剩余需要数量。 -
barrierAction
变量,为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行barrierAction
,用于处理更加复杂的业务场景。 -
generation
变量,表示 CyclicBarrier 的更新换代。
await方法
//等待,不设置超时时间
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //等待,设置超时间 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
设置超时时间的意思是说,当超时时间到了,就唤醒该线程,不管这时所有的线程是不是执行完了。
进入#dowait(boolean timed, long nanos)方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //获取当前分代 final Generation g = generation; //如果分代被“损坏”,抛出异常,分代怎么能被损坏呢?看下面的代码 if (g.broken) throw new BrokenBarrierException(); //如果线程被中断,那分代就被损坏了,这时会重置拦截线程数和唤醒等待线程,防止无限等待下去 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //剩余线程数量减1 int index = --count; //index = 0,说明当前线程是最后一个线程 if (index == 0) { // tripped boolean ranAction = false; try { //拿到构造方法中传进来的命令 final Runnable command = barrierCommand; if (command != null) //执行 command.run(); ranAction = true; //这里会唤醒别的等待的线程,同时会重置拦截线程数量和新建分代,表示这一组线程完成 //开始下一组 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { //如果没有设置了超时时间 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //抛异常,说明等待的时候该线程被中断了 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { //分代被损坏 breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //如果分代broken,抛出异常,唤醒之后再次判断,防止上面执行catch if (g.broken) throw new BrokenBarrierException(); //由于上面在执行到最后一个线程的时候才会唤醒该线程,而且会重置分代,所以这个 //if条件会成立,也就是说await方法会返回还剩余多少个等待线程 if (g != generation) return index; //这个nanos是trip.awaitNanos返回的,如果超时了,这个条件就成立 if (timed && nanos <= 0L) {
//唤醒所有线程,屏障被损坏 breakBarrier(); throw new TimeoutException(); } } } finally { //最后解锁,上面的所有操作都是原子操作 lock.unlock(); } }
线程wait之后,如下条件可以结束wait状态:
- 最后一个线程执行完,index = 0
- 当前线程等待超时
- 别的线程等待超时
- 等待的过程中当前线程发生中断
- 等待过程别的线程发生中断
- 某个线程执行了reset方法
Generation
private static class Generation { boolean broken = false; }
private Generation generation = new Generation();
Generation是CyclicBarrier中的一个静态类,首次初始化CyclicBarrier的时候,会自动初始化,该类是处理分代的,同一组的线程属于一个分代,当这一组的所有线程都到达barrier的时候,就重新new一个新的,可以看一下如下方法:
nextGeneration();
该方法在index=0的时候会执行,代码如下:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties;
//当index=0的时候,说明这一组线程全部执行完了,重新新建分代对象 generation = new Generation(); }
也就是说每次分代都会重新新建一个Generation类的对象。
reset()方法
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
该方法会重置分代,而且会通知上一个分代的所有等待的线程。这里有一个问题,如果在执行reset的时候,上一个分代中还有的线程没有执行到await方法,在reset之后,上一个分代中这些方法又进入await,这个时候在breakBarrier方法中把分代给损坏了是不管用的,先看一下这个方法
private void breakBarrier() {
//“损坏分代方法” generation.broken = true; count = parties; trip.signalAll(); }
在这个方法中设置为true之后,接着又执行了nextGeneration,在这个方法中会新建一个分代,此时generation.broken = false,所以上一个分代的线程会继续执行,而且会进入await,会使用reset之后的分代。
参考: