主要成员变量:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; //栅栏放开钱执行的任务 private final Runnable barrierCommand; //当前代 private Generation generation = new Generation();//当前多少个线程在等待 private int count;
构造函数:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
主要方法:
private void nextGeneration() { //唤醒所有阻塞的线程 trip.signalAll(); //产生下一代 count = parties; generation = new Generation(); } private static class Generation { //标识栅栏是否被破坏 boolean broken = false; }
private void breakBarrier() { //当前栅栏已经破坏 generation.broken = true; count = parties; //唤醒所有阻塞的线程 trip.signalAll(); }
//有2个await方法区别是是否超时 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每次调用count-- int index = --count; //index==0时说明需要等待的线程都到齐了 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; //在开启栅栏之前,同步执行传入的任务 if (command != null) command.run(); ranAction = true; //产生下一代唤醒等待线程并返回 nextGeneration(); return 0; } finally { //执行传入的任务报错则破坏栅栏 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //还没有全部到齐时 for (;;) { try { //未设置超时 if (!timed) trip.await(); //设置超时 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; //超时处理 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
应用:
class Person implements Runnable { String name; CyclicBarrier cyclicBarrier; public Person(String name, CyclicBarrier cyclicBarrier) { this.name = name; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println(name+"ready"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+"end ready"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name+"do something together"); } }
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all person ready"); } }); for (int i = 0; i < 5; i++) { new Thread(new Person("person"+i,cyclicBarrier)).start(); } }
结果:
person0ready
person1ready
person2ready
person3ready
person4ready
person4end ready
person3end ready
person2end ready
person1end ready
person0end ready执行
all person ready
person1do something together
person4do something together
person2do something together
person3do something together
上例中模拟4人到齐后才会各自开始做某事。但有5个人准备,当4个人到达CyclicBarrier后barrierAction先执行,barrierAction执行完成后,栅栏打开放开4个线程。最后一个到达线程的person0将被阻塞等待3个线程到达CyclicBarrier才会执行。
要点:
- CyclicBarrier可重复利用。(当一次用完count减到0后会产生下一代,count被重新设置为parties)
- 传入的barrierAction是同步执行的,如果执行时抛出异常,栅栏也会被破坏掉。