zoukankan      html  css  js  c++  java
  • CyclicBarrier源码分析

    CyclicBarrier源码分析

    CyclicBarrier的作用是让一组线程互相等待至某个状态后并行执行(相对外部来说是并行,其实内部还是串行)

    基本的使用方法是创建一个CyclicBarrier实例,并且指定parties的个数,然后线程依次调用CyclicBarrier的await()方法让自己进入等待状态,当最后一个线程进入await()方法时,将会唤醒所有正在等待的线程,并行执行。

    CyclicBarrier虽然也是同步器,但是并非直接通过AQS来进行实现的,而是借助了ReentrantLock以及Condition来进行实现。


    CyclicBarrier的结构

    public class CyclicBarrier {
        
        /**
         * 存在一个Generation静态内部类
         */ 
        private static class Generation {
            boolean broken = false; // 标识CyclicBarrier是否被破坏
        }
    
        private final ReentrantLock lock = new ReentrantLock();
        
        private final Condition trip = lock.newCondition(); // 与ReentrantLock绑定的Condition实例
    
        private final int parties; // 用于记录一共有多少个线程需要等待
    
        private final Runnable barrierCommand; // 由最后一个进入await()方法的线程进行调用
    
        private Generation generation = new Generation();
    
        private int count; // 用于记录还需要多少个线程进行等待
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
       
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            // ......
        }
        
        // 其他省略
    }
    

    可以看到CyclicBarrier存在全局的lock、trip、parties、count、barrierCommand以及generation属性,其中parties属性用于记录一共有多少个线程需要等待,而count用于记录还需要多少个线程进行等待。

    同时CyclicBarrier中定义了一个Generation静态内部类,该内部类只有一个broken全局属性,用于标识CyclicBarrier是否被破坏,默认为false。

    同时CyclicBarrier的构造方法会初始化全局的parties、count以及barrierCommand属性(CyclicBarrier初始化后,count的数量等于parties的数量)


    await()方法

    由于当创建CyclicBarrier实例之后,线程需要依次调用CyclicBarrier的await()方法让自己进入等待状态,因此从await()方法开始入手。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    

    await()方法存在两个重载,区别是一个支持超时,一个不支持超时,最终都会调用dowait()方法(使用timed参数表示是否有超时限制,如果timed参数为true则需要传递具体的超时时间)


    dowait()方法

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 获取全局的ReentrantLock实例,并进行加锁           
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取全局的Generation实例,如果Generation中的broken属性为true则表示CyclicBarrier已经被破坏,则直接抛出异常(默认是false)
            final Generation g = generation;
    
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 如果线程已经被设置了中断标识,则调用breakBarrier()方法,破坏CyclicBarrier
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException(); // 抛出异常
            }
    
            // index属性用于记录还需要多少个线程进行等待
            int index = --count;
            // 如果index等于0,表示当前线程是最后一个进入await()方法的线程,如果barrierCommand不为空,那么执行barrierCommand的run()方法,然后调用nextGeneration()方法,唤醒在指定Condition实例中等待的所有线程,并重置CyclicBarrier,然后线程直接返回,做自己的事情
            if (index == 0) {  // 最后一个线程走这个逻辑
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果在执行barrierCommand的run()方法时抛出异常,那么ranAction标识为false,那么需要调用breakBarrier()方法,破坏CyclicBarrier
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // 如果非最后一个线程那么将会往下执行
            
            // 循环
            for (;;) {
                try {
                    // 如果没有超时限制,那么直接调用Condition实例的await()方法,让线程在指定的Condition实例中进行等待,并释放掉它拥有的锁
                    // 如果有超时限制,那么调用Condition实例的awaitNanos()方法,至多让线程在指定的Condition实例中等待指定的时间,该方法返回线程被唤醒后剩余的毫秒数(超时返回小于等于0),并释放掉它拥有的锁
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L) 
                        nanos = trip.awaitNanos(nanos); 
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                // 当线程被唤醒后将会串行执行以下的逻辑
                
                // 如果发现CyclicBarrier被破坏了,那么就抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 正常情况下,当调用了nextGeneration()方法之后,generation引用就指向一个新的Generation实例,因此g!=generation,那么线程直接返回,做自己的事情
                if (g != generation)
                    return index;
    
                // 如果线程在Condition实例等待的过程中由于达到了超时时间而被唤醒了,那么将会调用breakBarrier()方法,破坏CyclicBarrier
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException(); // 抛出异常
                }
            }
        } finally {
            lock.unlock(); // 解锁
        }
    }
    

    当线程进入dowait()方法后,需要获取锁,如果当前线程并非最后一个进入await()方法的线程,那么将会在指定的Condition实例中进行等待,然后释放掉它拥有的锁,如果当前线程是最后一个进入await()方法的线程(index==0,表示还需要0个线程进行等待),如果barrierCommand不为空,那么将会执行barrierCommand的run()方法,最后调用nextGeneration()方法。

    如果在执行dowait()方法的过程中,线程已经被设置了中断标识,或者最后一个线程在执行barrierCommand的run()方法时抛出异常,或者在指定Condition实例等待的线程由于达到了超时时间而被唤醒,那么都会调用breakBarrier()方法。


    nextGeneration()方法

    private void nextGeneration() {
        // 唤醒在指定Condition实例中等待的所有线程
        trip.signalAll();
        // 将count的数量设置成parties
        count = parties;
        // 将generation引用指向一个新的Generation实例
        generation = new Generation();
    }
    

    nextGeneration()方法用于指向下一个Generation,该方法将会唤醒在指定Condition实例中等待的所有线程,然后将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,最后将generation引用指向一个新的Generation实例。


    breakBarrier()方法

    private void breakBarrier() {
        // 将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏
        generation.broken = true;
        // 将count的数量设置回parties
        count = parties;
        // 唤醒在指定Condition实例中等待的所有线程
        trip.signalAll();
    }
    

    breakBarrier()方法用于破坏CyclicBarrier,将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏,然后将count的数量设置成parties,最后唤醒在指定Condition实例中等待的所有线程。


    reset()方法

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier(); // 感觉是多余的
            nextGeneration(); // 指向下一个Generation
        } finally {
            lock.unlock();
        }
    }
    

    reset()方法用于重置CyclicBarrier,其根本是将generation引用指向一个新的Generation实例。


    流程总结

    1.当创建了一个CyclicBarrier实例之后,线程需要依次调用CyclicBarrier的await()方法,让自己进入等待状态。

    2.await()方法又会调用dowait()方法,当线程进入dowait()方法后,需要获取锁,如果当前线程并非最后一个进入await()方法的线程,那么将会在指定的Condition实例中进行等待,然后释放掉它拥有的锁,如果当前线程是最后一个进入await()方法的线程(index==0,表示还需要0个线程进行等待),如果barrierCommand不为空,那么将会执行barrierCommand的run()方法,最后调用nextGeneration()方法。

    3.nextGeneration()方法用于指向下一个Generation,该方法将会唤醒在指定Condition实例中等待的所有线程,然后将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,最后将generation引用指向一个新的Generation实例,当最后一个线程执行完nextGeneration()方法后,将会直接返回,做自己的事情,最后释放掉它拥有的锁。

    4.当被唤醒的线程依次获取到锁后,将会继续往下执行,如果判断到generation引用已经指向一个新的Generation实例,那么直接返回,做自己的事情,最后释放掉它拥有锁。

    5.如果在执行dowait()方法的过程中,线程已经被设置了中断标识,或者最后一个线程在执行barrierCommand的run()方法时抛出异常,或者在指定Condition实例等待的线程由于达到了超时时间而被唤醒,那么都会调用breakBarrier()方法,破坏CyclicBarrier,将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏,然后将count的数量设置成parties,最后唤醒在指定Condition实例中等待的所有线程。

    6.当被唤醒的线程依次获取到锁后,将会继续往下执行,如果判断到Generation实例的broken属性被设置了true,也就是CyclicBarrier已经被破坏,那么将会直接抛出异常,最后释放掉它拥有的锁。

    7.当CyclicBarrier被破坏后是不能够进行复用的,因为Generation的broken属性已经被设置成true,因此需要先调用一次reset()方法进行重置。


    FAQ

    为什么说CyclicBarrier是可以复用的?

    因为当最后一个线程进入await()方法,将会调用nextGeneration()方法,该方法除了唤醒在指定Condition中等待的线程之外,还会将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,同时将generation引用指向一个新的Generation实例,因此CyclicBarrier是可以复用的,同时需要注意的是,如果CyclicBarrier已经被破坏,那么需要先调用一次reset()方法之后才能够进行复用。

  • 相关阅读:
    linux 程序安装目录/opt目录和/usr/local目录的区别
    Linux文件目录结构详解
    Jenkins卸载方法(Windows/Linux/MacOS)
    Jmeter案例demo
    idea打包java可执行jar包
    查看端口状态
    轻松掌握mongodb
    sphinx和coreseek
    redis
    redis默认端口6379以其名命名,是我孤陋寡闻了,是名性感美女(梅尔兹)
  • 原文地址:https://www.cnblogs.com/funyoung/p/13633450.html
Copyright © 2011-2022 走看看