zoukankan      html  css  js  c++  java
  • CyclicBarrier源码分析

    public class CyclicBarrierExample3 {
        private static CyclicBarrier1 barrier = new CyclicBarrier1(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("callbakck ISIrunning");
            }
        });
        public static void main(String[] args) throws Exception {
            for (int i = 0; i < 10; i++) {
                /* Thread t = */new Thread(new Runnable() {
                    @Override
                    public void run() {
                         try {
                            barrier.await();
                            System.out.println(Thread.currentThread().getName());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                },"线程"+(i+1)).start();
            }
        }
    }
    public class CyclicBarrier1 {
        //一次等待中,有一个generation和count。不同的组等待generation不同。
        private volatile static int i; 
        private static class Generation {//表示一组,
            boolean broken = false;//一组等待是否坏了
            String name = "g"+(++i);
        }
        private final int parties;//线程等待数量,不变
        private int count;//等待线程数,减一,会变,剩余等待数量
        private Generation generation = new Generation();//每一组一个generation
        
        private final ReentrantLock1 lock = new ReentrantLock1();//非公平锁
        private final Condition trip = lock.newCondition();
        private final Runnable barrierCommand;//执行函数
    
        private void nextGeneration() {//线程等待数量够了,唤醒所有线程,并在重新初始化一个generation
            //第3个线程来signalAll,第1第2个线程会从condition队列移到AQS队列去,
            //第3个线程来unlock,第1第2线程不一定会进来执行(因为AQS队列前面可能还有别的线程)。
            //第3个线程unlock,只是允许外部线程或者AQS中的线程或者condition加到AQS中的线程,唤醒一个进来。
            trip.signalAll();
            
            count = parties;//parties不会变化,count会减减,这里重置count等着下一次的等待用。
            generation = new Generation();//重新初始化generation
        }
    
        private void breakBarrier() {// 
            generation.broken = true;// 这一组异常,会影响后面不是一组的线程。
            count = parties;//重置count
            trip.signalAll();//唤醒所有,condition加到AQS去。
        }
    
        //多线程访问
        private int dowait(boolean timed, long nanos) throws Exception {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            //进来的是AQS中的(包括condition移过去的)
            try {
                //每一组一个g,nextGeneration()会新建generation。如果改变了,这个线程使用的是上组中最后线程改变的generation。
                final Generation g = generation;
                
    //            if(Thread.currentThread().getName().equals("线程4") 
    //                    || Thread.currentThread().getName().equals("线程9")) {
    //                Thread.currentThread().interrupt();
    //            }
                
                if (g.broken)//generation不会被改变
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {// 线程中断, 
                    breakBarrier();//broken = true; 不会改变generation,后面进来的线程都使用这个generation,直接跑异常。
                    throw new InterruptedException();
                }
    
                int index = --count;//数量减一,index保存在线程栈中,
                if (index == 0) {  //释放所有线程  
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //只有一组中,最后进来的线程 改变generation。后面进来的线程属于另一组。generation由上一组中最后进来的线程修改。
                        nextGeneration();//唤醒所有等待线程,重置count和generation。注意这里没有释放锁, 新建一个generation。
                        return 0;
                    } finally {
                        if (!ranAction)//ranAction=false进去
                            breakBarrier();//broken = true;
                    }
                }
    
                // 死循环直到  唤醒, broken, 中断, 超时
                for (;;) {
                    try {
                        
    //                    if(Thread.currentThread().getName().equals("线程4") 
    //                            || Thread.currentThread().getName().equals("线程9")) {
    //                        throw new Exception();
    //                    }
                        
                        if (!timed)//是否有超时
                            trip.await();//线程转移到Condition上等待,并且head唤醒AQS下一个节点。
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);//等待时间
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();//设置broken=true,重置count,condition加到AQS去。 
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)//true
                        throw new BrokenBarrierException();
    
                    // g != generation表示正常换代了,返回当前线程所在栅栏的下标
                    // 如果 g == generation,说明还没有换代,那为什么会醒了?
                    // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
                    // 正是因为这个原因,才需要generation来保证正确。
                    if (g != generation)
                        return index;//返回
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();// 
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();//外部线程或者AQS中的线程或者condition加到AQS中的线程,唤醒一个进来。
            }
        }
    
        public CyclicBarrier1(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;//等待线程数
            this.count = parties;//等待线程数
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier1(int parties) {
            this(parties, null);
        }
    
        public int getParties() {
            return parties;//parties不变
        }
    
        public int await() throws Exception, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        public int await(long timeout, TimeUnit unit) throws Exception, BrokenBarrierException, TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        public boolean isBroken() {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    
        public void reset() {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   //  
                nextGeneration(); //  
            } finally {
                lock.unlock();
            }
        }
    
        public int getNumberWaiting() {
            final ReentrantLock1 lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    }
  • 相关阅读:
    window.open() 使用详解
    20151117
    20151116
    打开一个网页并弹窗提示,点击确定后2秒后关闭
    网页制作中的一点问题及解决方案
    Android WebView 开发详解(二)
    Android WebView 开发详解(一)
    Android:控件WebView显示网页
    Dagger 2: Step To Step
    Introducing RecyclerView(二)
  • 原文地址:https://www.cnblogs.com/yaowen/p/11355073.html
Copyright © 2011-2022 走看看