CyclicBarrier类似一个栅栏,只有规定的线程数量到了这个栅栏之后才会放行
首先来看CyclicBarrier的构造方法
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
CyclicBarrier有两个构造函数,第二个构造函数会传入一个Runnable对象,功能就是CyclicBarrier会在规定的线程到达之后执行这个Runnbale方法
CyclicBarrier的await()方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //每个线程调用一次await()方法,count的就减1 if (index == 0) { // tripped //如果是最后一个线程到达了的话,index为0 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); //如果构造函数传入了Runnable的话,就会执行run方法 ranAction = true; nextGeneration(); //会把在Condition里的所有同步队列放到aqs中的同步队列中去,并把waitStatus从Condition改成SINGAL,并把Count的值 return 0; 重新改为原来的值,实现重用 } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { //进入到这里说明线程不是最后一个到达的线程 try { if (!timed) trip.await(); //线程会被阻塞,并放入到Condition的等待队列里去 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }