zoukankan      html  css  js  c++  java
  • CyclicBarrier 源码分析

    CyclicBarrier

    CyclicBarrier 是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
    之后同时释放执行。CyclicBarrier 可以重置同步器的状态(循环使用栅栏),而 CountDownLatch 则不能。
    CyclicBarrier 能指定一个 barrierCommand 在栅栏释放后执行归并。
    

    创建实例

        /**
         * 栅栏的分代,循环递增
         */
        private static class Generation {
            Generation() {}                
            // 此带的栅栏是否已经被破坏
            boolean broken;                 
        }
    
        /** 保护进入屏障的锁 */
        private final ReentrantLock lock = new ReentrantLock();
        /** 屏障释放条件 */
        private final Condition trip = lock.newCondition();
        /** 参与者数目 */
        private final int parties;
        /** 屏障释放后执行的任务 */
        private final Runnable barrierCommand;
        /** 当前的代 */
        private Generation generation = new Generation();
    
        /**
         *  待进入屏障的线程数
         */
        private int count;
    
        /**
         *  创建具有 parties 个参与者,无后置任务的屏障
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        /**
         * 创建具有 parties 个参与者,后置任务为 barrierAction 的屏障
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) {
                throw new IllegalArgumentException();
            }
            this.parties = parties;
            count = parties;
            barrierCommand = barrierAction;
        }
    

    同步阻塞

        /**
         *  1)尝试阻塞等待所有的参与者都到达屏障,如果当前线程是最后一个参与者,则唤醒所有其他阻塞的参与者线程
         *  2)当前线程被中断
         *  3)任何一个参与者线程被中断
         *  4)任何一个参与者线程超时
         *  5)屏障的 reset 方法被调用
         */
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (final TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        public int await(long timeout, TimeUnit unit)
                throws InterruptedException,
                BrokenBarrierException,
                TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        private int dowait(boolean timed, long nanos)
                throws InterruptedException, BrokenBarrierException,
                TimeoutException {
            // 读取锁
            final ReentrantLock lock = this.lock;
            // 锁定
            lock.lock();
            try {
                // 读取当前代
                final Generation g = generation;
                
                // 如果屏障已经破裂
                if (g.broken) {
                    throw new BrokenBarrierException();
                }
                
                // 当前线程被其他线程中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
                
                // 剩余参与者数
                final int index = --count;
                // 如果当前线程是最后一个参与者
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        // 如果后置任务不为 null,则运行它
                        final Runnable command = barrierCommand;
                        if (command != null) {
                            command.run();
                        }
                        ranAction = true;
                        // 递增分代
                        nextGeneration();
                        return 0;
                    } finally {
                        // 任务运行时出现异常,也要打破屏障
                        if (!ranAction) {
                            breakBarrier();
                        }
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        // 1)如果不是超时模式,则一直阻塞 
                        if (!timed) {
                            trip.await();
                        // 2)超时阻塞   
                        } else if (nanos > 0L) {
                            nanos = trip.awaitNanos(nanos);
                        }
                    // 如果线程被中断  
                    } catch (final InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
                    
                    // 线程被唤醒时,屏障已经破裂
                    if (g.broken) {
                        throw new BrokenBarrierException();
                    }
                    
                    // 已经生成了新的分代
                    if (g != generation) {
                        return index;
                    }
                    
                    // 当前线程等待超时
                    if (timed && nanos <= 0L) {
                        // 打破屏障
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    
  • 相关阅读:
    搞明白这八个问题,Linux系统就好学多了
    Fedora 25 Alpha版本今天发布啦
    Linux新手应掌握的10个基本命令
    PC-BSD 换名 TrueOS
    JPA+Springboot实现分页效果
    陈亮
    押尾光太郎
    岸部真明
    面试必备-网络的七层协议
    JavaScript中的快速排序
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10122976.html
Copyright © 2011-2022 走看看