public class CyclicBarrierExample3 { private static CyclicBarrier1 barrier = new CyclicBarrier1(3, new Runnable() { @Override public void run() { System.out.println("callbakck ISIrunning"); } }); public static void main(String[] args) throws Exception { for (int i = 0; i < 10; i++) { /* Thread t = */new Thread(new Runnable() { @Override public void run() { try { barrier.await(); System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } },"线程"+(i+1)).start(); } } }
public class CyclicBarrier1 { //一次等待中,有一个generation和count。不同的组等待generation不同。 private volatile static int i; private static class Generation {//表示一组, boolean broken = false;//一组等待是否坏了 String name = "g"+(++i); } private final int parties;//线程等待数量,不变 private int count;//等待线程数,减一,会变,剩余等待数量 private Generation generation = new Generation();//每一组一个generation private final ReentrantLock1 lock = new ReentrantLock1();//非公平锁 private final Condition trip = lock.newCondition(); private final Runnable barrierCommand;//执行函数 private void nextGeneration() {//线程等待数量够了,唤醒所有线程,并在重新初始化一个generation //第3个线程来signalAll,第1第2个线程会从condition队列移到AQS队列去, //第3个线程来unlock,第1第2线程不一定会进来执行(因为AQS队列前面可能还有别的线程)。 //第3个线程unlock,只是允许外部线程或者AQS中的线程或者condition加到AQS中的线程,唤醒一个进来。 trip.signalAll(); count = parties;//parties不会变化,count会减减,这里重置count等着下一次的等待用。 generation = new Generation();//重新初始化generation } private void breakBarrier() {// generation.broken = true;// 这一组异常,会影响后面不是一组的线程。 count = parties;//重置count trip.signalAll();//唤醒所有,condition加到AQS去。 } //多线程访问 private int dowait(boolean timed, long nanos) throws Exception { final ReentrantLock1 lock = this.lock; lock.lock(); //进来的是AQS中的(包括condition移过去的) try { //每一组一个g,nextGeneration()会新建generation。如果改变了,这个线程使用的是上组中最后线程改变的generation。 final Generation g = generation; // if(Thread.currentThread().getName().equals("线程4") // || Thread.currentThread().getName().equals("线程9")) { // Thread.currentThread().interrupt(); // } if (g.broken)//generation不会被改变 throw new BrokenBarrierException(); if (Thread.interrupted()) {// 线程中断, breakBarrier();//broken = true; 不会改变generation,后面进来的线程都使用这个generation,直接跑异常。 throw new InterruptedException(); } int index = --count;//数量减一,index保存在线程栈中, if (index == 0) { //释放所有线程 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //只有一组中,最后进来的线程 改变generation。后面进来的线程属于另一组。generation由上一组中最后进来的线程修改。 nextGeneration();//唤醒所有等待线程,重置count和generation。注意这里没有释放锁, 新建一个generation。 return 0; } finally { if (!ranAction)//ranAction=false进去 breakBarrier();//broken = true; } } // 死循环直到 唤醒, broken, 中断, 超时 for (;;) { try { // if(Thread.currentThread().getName().equals("线程4") // || Thread.currentThread().getName().equals("线程9")) { // throw new Exception(); // } if (!timed)//是否有超时 trip.await();//线程转移到Condition上等待,并且head唤醒AQS下一个节点。 else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//等待时间 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier();//设置broken=true,重置count,condition加到AQS去。 throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken)//true throw new BrokenBarrierException(); // g != generation表示正常换代了,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换代,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要generation来保证正确。 if (g != generation) return index;//返回 if (timed && nanos <= 0L) { breakBarrier();// throw new TimeoutException(); } } } finally { lock.unlock();//外部线程或者AQS中的线程或者condition加到AQS中的线程,唤醒一个进来。 } } public CyclicBarrier1(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties;//等待线程数 this.count = parties;//等待线程数 this.barrierCommand = barrierAction; } public CyclicBarrier1(int parties) { this(parties, null); } public int getParties() { return parties;//parties不变 } public int await() throws Exception, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws Exception, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } public boolean isBroken() { final ReentrantLock1 lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset() { final ReentrantLock1 lock = this.lock; lock.lock(); try { breakBarrier(); // nextGeneration(); // } finally { lock.unlock(); } } public int getNumberWaiting() { final ReentrantLock1 lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }