zoukankan      html  css  js  c++  java
  • CountDownLatch 瞬间炸裂!同基于 AQS,凭什么 CyclicBarrier 可以这么秀?

    前言

    看完 CountDownLatch 正准备表示一番,突然看到了一个 CyclicBarrier —— 回环屏障。沃特?回环还屏障?说比 CountDownLatch 要多一个回环,那咱可得瞧一瞧,看一看了!

    介绍

    一个同步辅助,它允许一组线程的所有等待彼此达成共同屏障点。

    CyclicBarrier 在涉及固定线程数且必须等待彼此的程序非常有用。

    该屏障被称为回环屏障 ,因为它在等待的线程被释放后可以被重新利用。

    CyclicBarrier 支持一个可选的 Runnable 命令,该命令在障碍中的最后一个线程到达之后,但在释放任何线程之前,每个屏障点运行一次。

    此屏障操作对于在任何一方继续之前更新共享状态很有用。

    通过上面的源码注释基本可以得出以下结论:

    1. CyclicBarrier 和 CountDownLatch 类似,但它是一组线程等待,直到在其他线程中执行的一组操作完成为止。
    2. CountDownLatch 是计数递减,结束后再调用 await 或者 countdown 都会立即返回,但是 CyclicBarrier 可以重置屏障。
    3. CyclicBarrier 还可以传入参数 Runnable ,Runnable 会在释放线程之前执行。

    基本使用

    既然上面总结了三个结论,下面当然从三个方面演示如何使用的:

    - 屏障功能

    public class CyclicBarrierTest {
    
        private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(11);
    
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
    
            ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(1024),
                    new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                    new ThreadPoolExecutor.AbortPolicy());
    
            for (int i = 0; i < 10; i++) {
    
                pool.submit(() -> {
    
                    try {
                        System.out.println(Thread.currentThread().getName() + " 开始执行");
                        Thread.sleep(5000);
                        System.out.println(Thread.currentThread().getName() + " 执行结束,准备调用 await");
                        CYCLIC_BARRIER.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
    
                });
    
            }
    
            System.out.println("主线程执行 —————————————— >>>");
    
            CYCLIC_BARRIER.await();
    
            System.out.println("主线程继续执行 —————————————— >>>");
    
            pool.shutdown();
    
        }
    }
    

    通过上面代码其实模拟了个类似 CountDownLatch 的功能,让所有线程等待,直到都调用 await 之后,各个线程继续执行,同时主线程也继续往下执行。

    不过相对 CountDownLatch 的指定一个线程或多个等待,直到其他线程执行结束,等待的线程才继续执行来说,CyclicBarrier 相对来说还是逊色。

    差别总结如下:

    1. CountDownLatch 是指定等待的线程,其他线程进行 countDown,等计数为 0 时,等待的线程继续执行。
    2. CyclicBarrier 是一组线程调用 await 进行等待,当所有的都进入等待的时候,这一组就会一起冲破屏障继续执行。

    - 回环功能

    public class CyclicBarrierTest2 {
    
        private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5);
    
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
            
            ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(1024),
                    new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                    new ThreadPoolExecutor.AbortPolicy());
    
            for (int i = 0; i < 5; i++) {
    
                pool.submit(() -> {
    
                    try {
                        System.out.println(Thread.currentThread().getName() + " 开始执行");
                        CYCLIC_BARRIER.await();
    
                        System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>> 1");
                        CYCLIC_BARRIER.await();
    
                        System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>>>> 2");
                        CYCLIC_BARRIER.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
    
                });
    
            }
    
            pool.shutdown();
        }
    }
    

    carbon-gzpBD4

    上面演示的回环的用法。

    - 回环 Runnable

    这块只需要在声明的 CyclicBarrier 修改为以下即可:

    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5, new Runnable() {
        @Override
        public void run() {
            System.out.println("执行一次 Runnable ");
        }
    });
    

    打印结果如下:

    carbon1-lHnKnA

    可以看出只是在下一个计数开始之前,先执行 Runnable 。至于是不是在释放屏障之前,那很容易,直接 Debug 走一遭就知道了!专门录制了个视频:

    cyclicBarrier-vl-5Bz3Xa

    通过 debug 可以看出Runnable 会在释放线程之前执行

    问题疑问?

    1. CyclicBarrier 和 AQS 有什么关系?
    2. CyclicBarrier 的实现原理是什么?
    3. CyclicBarrier 是如何实现回环的?

    下面就带着疑问去源码阅读,一探究竟!

    源码分析

    基本结构

    CleanShot-2020-09-12-KFzaCR0G@2x-seVhre

    通过 UML 乍一看,CyclicBarrier 和 AQS 并无什么关系,那下面开始从参数构造器await()方法分别看源码。

    参数

    public class CyclicBarrier {
    
        /**
         * 屏障的每次使用都表示为一个生成实例。
         * broken 表示屏障是否被打破。
         */
        private static class Generation {
            boolean broken = false;
        }
    
        /** 锁 */
        private final ReentrantLock lock = new ReentrantLock();
        /** 条件等待,直到屏障 */
        private final Condition trip = lock.newCondition();
        /** 等待计数 */
        private final int parties;
        /* The command to run when tripped */
        private final Runnable barrierCommand;
        /** 当前 generation 新创建的*/
        private Generation generation = new Generation();
        /** 仍在等待的 parties 数量,递减 为 0 会重置 */
        private int count; 
    }
    
    

    通过上面可以看出:

    内部使用了一个静态类 Generation ,它有什么功能呢?通过注释了解到,每次使用屏障的时候都会生成,具体有什么用,其实就是用来标示屏障是否被打破。

    内部还有一个 parties 表示等待计数,count 表示仍在等待的计数。

    那就继续往下看吧!

    构造器

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    这里的入参有两个:

    • parties(等待计数):记录多少个线程调用 await 之后,才会一起打破屏障。
    • barrierAction:冲破屏障前执行的行为。
    • 但是会同时对 parties 和 count 赋值为传入的 parties。

    单参数构造,其实就是将 barrierAction 赋值为 null。

    await() 方法

    在示例中用的 await() 方法, 那就从 await() 方法入手:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    

    await() 才是重头戏, 先来根据源码注释,了解是干嘛的,看看作者怎么讲:

    1. 等到所有各方都在此障碍上调用await。
    2. 如果当前线程不是最后到达的线程,则出于线程调度目的将其禁用,并使其处于休眠状态,直到发生以下情况之一:
      1. 最后一个线程到达;
      2. 其他一些线程中断当前线程;
      3. 其他一些线程中断其他正在等待的线程之一;
      4. 等待屏障的时候其他线程超时;
      5. 其他一些线程在此屏障上调用 reset。

    看到这些,咱们最想看的当然是 2.1 ,等待最后一个线程到达屏障,之后所有的线程一起继续执行。

    
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
                TimeoutException {
        
        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
    
            // 在这里用到了这个代
            final Generation g = generation;
    
            if (g.broken)
                throw new BrokenBarrierException();
            // 线程终中断标示
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            // 对计数进行递减
            int index = --count;
            // 如果是 0 则
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 不是 null 先执行行为
                    if (command != null)
                        // 这里不是新开线程
                        command.run();
                    ranAction = true;
                    // 下一代
                    nextGeneration();
                    return 0;
                } finally {
                    // 任务未成功时,即 ranAction 还是 false 打破屏障
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            // 自旋
            for (;;) {
                try {
                    // 没有设置超时时间
                    if (!timed)
                    // 进入等待
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
                // 已经下一代了
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    

    这一大坨代码,完全没有看的欲望,直接划过去吧!

    所以…… 直接看到了这里吧。

    代码还是要阅读的,分开来看(异常流程省略):

    1. 使用了 ReentrantLock 互斥锁,因此对 count、broken 的修改是原子性的。
    2. 对 count 进行 --count 操作,这样就理解为什么说 count 是仍在等待的计数,或者说还有多少才能到达屏障点。
    3. 当 count 为 0 ,表示到达屏障点了
      1. cyclicbarrier-amQMu4
      2. command 不为 null,会先执行 command.run(), 值得注意的是这里并不是新开了个线程。
      3. nextGeneration()开始新的下一代,即重置 count 为 parties。
      4. 在 finally 里面使用 breakBarrier() 打破屏障。
    4. 当 count 不是 0
      1. 自旋,直到是 0.

    这后面还有两个方法不能少:

    private void nextGeneration() {
        // 唤醒线程
        trip.signalAll();
        // 更新 count 为 parties
        count = parties;
        // 更新 Generation
        generation = new Generation();
    }
    
    // 打破屏障,并唤醒全部
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    reset()

    
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    

    将屏障重置为其初始状态,reset() 方法其实还是调用的 breakBarrier() 和 nextGeneration(),前者时打破当前代,后者是开始新的一轮。

    总结

    Q: CyclicBarrier 和 AQS 有什么关系?
    A: 通过阅读源码,其实发现是使用了 ReentrantLock 互斥锁 以及 Condition 的等待唤醒功能。

    Q: CyclicBarrier 的实现原理是什么?
    A: 内部含有两个计数,分别是 parties 和 count ,初始是二者相等,当有线程调用 await() 时,count 递减,只要 count 不为 0 , 就会阻塞线程,直到 count 递减为 0 时,此时会所有线程一起释放,同时将 count 重置为 parties。

    Q: CyclicBarrier 是如何实现回环的?
    A: 使用两个计数,count 递减,当 count 为 0 时,会重置为 parties,从而达到回环效果。

    Q: 为什么 count 的 --count 操作没有使用 CAS?
    A: 因为已经 lock.lock() 了,使用了 ReentrantLock 锁能够保证 count 的原子性。

    CyclicBarrier 和 CountDownLatch 的区别

    1. 回环:CyclicBarrier 可以回环,重新计数。CountDownLatch 只能一轮。
    2. 计数器:CyclicBarrier 的计数器自己维护递减, CountDownLatch 的计数器维护则是交给使用者。
    3. 阻塞线程:CyclicBarrier 阻塞的是自身,当到达屏障后,所有被阻塞的线程一起释放。CountDownLatch 可以指定阻塞线程。

    结束语

    本文主要介绍了 CyclicBarrier 的常用方式,通过源码方式,分析如何达到屏障以及回环的效果。不对之处,请多指正。

  • 相关阅读:
    [CF1475F] Unusual Matrix
    [JXOI2018] 游戏
    [ZJOI2010] 排列计数
    [CF1474E] What Is It?
    [CF375D] Tree and Queries
    [CF519E] A and B and Lecture Rooms
    [CF321C] Ciel the Commander
    [CF1C] Ancient Berland Circus
    [CF321A] Ciel and Robot
    [CF1450C1] Errich-Tac-Toe (Easy Version)
  • 原文地址:https://www.cnblogs.com/liuzhihang/p/CyclicBarrier.html
Copyright © 2011-2022 走看看