zoukankan      html  css  js  c++  java
  • CyclicBarrier是如何成为一个"栅栏"的

    CyclicBarrier是一种类似于栅栏的存在,意思就是在栅栏开放之前你都只能被挡在栅栏的一侧,当栅栏移除之后,之前被挡在一侧的多个对象则同时开始动起来。

    1. 如何使用CyclicBarrier

      在介绍其原理之前,先了解一下CyclicBarrier应该如何使用。

      假设现在有这样的场景,我们需要开一个会议,需要张1、张2、张3三个人参加,
    会议需要三个人都到齐之后才能开始,否则只能干等着;这个场景用CyclicBarrier可以很契合的模拟出来。代码如下:

    public static void main(String[] args) {
        // 线程池,每个线程代表一个人
        ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
        // 会议所需的人数为3
        CyclicBarrier barrier = new CyclicBarrier(3);
    
        executor.execute(() -> {
            try {
                System.err.println("张1到达会议室");
                barrier.await();
                System.err.println("会议开始,张1开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
        executor.execute(() -> {
            try {
                System.err.println("张2到达会议室");
                barrier.await();
                System.err.println("会议开始,张2开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
        executor.execute(() -> {
            try {
                System.err.println("张3先去个厕所,内急解决再去开会");
                TimeUnit.SECONDS.sleep(1);
                System.err.println("张3到达会议室");
                barrier.await();
                System.err.println("会议开始,张3开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
    
        executor.shutdown();
    }
    

    结果图:
    例子图
      通过上方代码可以知道CyclicBarrier的几点:

    1. 使用await()来表示完成了某些事情。(上方例子的表现为到达了会议室)
    2. 使用await()之后当前线程就进入阻塞状态,需要等待完全满足CyclicBarrier的条件后唤醒才能继续接下来的操作。(上方例子中 为3个人都到达会议室)
    3. 在最后一个线程达到条件之后,之前阻塞的线程全部放开,继续接下来的操作。(上方例子为张3到达会议室)

      这个简单的例子也让我们了解CyclicBarrier的使用方法,那来看看其内部究竟是如何实现栅栏的效果的。

    2. CyclicBarrier是如何成为"栅栏"的

      从第一节的代码中我们也能看到,需要关注的就两个地方

    1. 构造函数
    2. await()方法

    只要了解这两个方法的内部,相当于了解了CyclicBarrier的内部。
    那在深入了解之前,先来看下CyclicBarrier的几个变量,不用刻意去记,看代码的时候知道这个东西做什么用的就行了:

    lock:CyclicBarrier类创建的ReentrantLock实例,关于ReentrantLock不清楚的可以->传送。

    trip:lock中的conditionCyclicBarrier使用该变量来实现各线程之间的阻塞和同时唤醒。同样,不明白condition作用的=>传送门

    parties:需要满足条件(调用await方法)的总数,就是说当有parties个线程await()之后就会唤醒全部线程。

    barrierCommand:一个Runnable变量,在await方法的调用次数到达总数parties之后,在唤醒全部线程之前执行其run()方法

    generation:其内部类,可以理解为周期,周期内需要完成n个任务,只要一个任务失败,当前周期的所有任务就算失败,结束当前周期,再开启下个周期。

    count:当前周期剩余需要完成的任务数(剩余调用await方法的次数)

    以下为源码:

    public class CyclicBarrier {
        // 内部类,可理解为周期
        private static class Generation {
            // 当前周期是否失败
            boolean broken = false;
        }
    
        // 锁的实例
        private final ReentrantLock lock = new ReentrantLock();
        // ReentrantLock的condition变量,用来控制线程唤醒和阻塞
        private final Condition trip = lock.newCondition();
        // 需要满足条件的次数,即需要调用await方法的次数
        private final int parties;
        // 满足条件次数达到parties之后,唤醒所有线程之前执行其 run()方法
        private final Runnable barrierCommand;
        // 当前周期
        private Generation generation = new Generation();
        // 剩余满足条件次数
        private int count;
        
        // ...
    }
    

      看完CyclicBarrier的几个变量后,来看其具体的内部实现。

      首先来看构造函数,其构造函数有两个,一个在达到条件总数(parties)后直接叫醒所有线程;另一个指定一个Runnable在达到条件总数后先执行其run()方法再叫醒。

    • 不指定Runnable,参数只有一个:需要达成的任务数
    public CyclicBarrier(int parties) {
        // 直接调用另一个构造方法,Runnable传null,表示不执行
        this(parties, null);
    }
    
    • 指定Runnable的构造方法,赋值任务总数、剩余任务数、唤醒操作之前的Runnable
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // 任务总数
        this.parties = parties;
        // 剩余需要完成的任务数
        this.count = parties;
        // 唤醒之前执行的Runnable
        this.barrierCommand = barrierAction;
    }
    

      在第一节我们使用的是第一个构造方法,来试试第二个

    public static void main(String[] args) throws InterruptedException {
    
        ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
        /** =======增加Runnable,其他地方保持一致=============*/
        CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在会议开始之前,先给大家发下开会资料"));
    
        executor.execute(() -> {
            try {
                System.err.println("张1到达会议室");
                barrier.await();
                System.err.println("会议开始,张1开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
        executor.execute(() -> {
            try {
                System.err.println("张2到达会议室");
                barrier.await();
                System.err.println("会议开始,张2开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
        executor.execute(() -> {
            try {
                System.err.println("张3先去个厕所,内急解决再去开会");
                TimeUnit.SECONDS.sleep(1);
                System.err.println("张3到达会议室");
                barrier.await();
                System.err.println("会议开始,张3开始发言");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        });
    
    
        executor.shutdown();
    }
    

    结果图:

    pic2

     看完构造函数,就算理解了一半CyclicBarrier了,接下来来看另一半——await();跟踪代码,看到是这样的

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    

    直接调用dowait方法,传参为false0,意思就是不限时等待,除非线程被打断或者唤醒。再进入dowait方法,这个方法就是CyclicBarrier的另一半,在下方的代码中很清楚的写了整个执行流程

    /** 参数说明, timed:是否限时, nanos:限时时间*/
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException, TimeoutException {
        // 锁
        final ReentrantLock lock = this.lock;
        // 获取锁,如果失败的话线程睡眠,进入同步队列(AQS中的知识)
        lock.lock();
        try {
            /* 拿到锁之后进入代码处理逻辑*/
            
            // 当前周期
            final Generation g = generation;
    
            // 如果当前周期是失败的,那么直接抛错
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 如果当前线程被打断了,那么此次周期失败,设置相关参数,然后抛错
            if (Thread.interrupted()) {
                // 实现代码在下行的注释中,设置相关参数来提醒其他线程周期失败了
                breakBarrier();
                /*
                 * private void breakBarrier() {
                 *     generation.broken = true;
                 *     count = parties;
                 *     // 唤醒condition中的所有线程
                 *     trip.signalAll();
                 * }
                 */
                throw new InterruptedException();
            }
    
            // 如果成功了,那么剩余任务数(count)减1
            int index = --count;
            // 如果为0则表示达到剩余的任务数没有了,达到CyclicBarrier的条件总数了,需要唤醒其他线程
            if (index == 0) {  
                boolean ranAction = false;
                try {
                    // 唤醒之前的Runnable
                    final Runnable command = barrierCommand;
                    // 如果不为空的话执行其run方法
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 开启下个周期,这个方法是CyclicBarrier可以复用的原因,具体实现在下行注释
                    nextGeneration();
                    /* private void nextGeneration() {
                     *     // 首先叫醒当前周期的其他线程,告诉其周期结束了,可以执行接下来的操作了
                     *     trip.signalAll();
                     *     // 然后开启下个周期,剩余任务数重置
                     *     count = parties;
                     *     // 下个周期
                     *     generation = new Generation();
                     * }
                     */
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // 如果还不能结束本周期,就一直等待直到结束或者周期失败
            for (;;) {
                try {
                    // await的过程中是释放锁的
                    // 不限时的话就一直等待直到被唤醒或者打断
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        // 否则的话等待一段时间后醒来
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    

      到这里就基本理解CyclicBarrier的内部实现了,其他像带参数的await也是一样逻辑,只不过是多了限时的条件而已。

      其实如果你了解ReentrantLock的话,就知道CyclicBarrier整个就是对ReentrantLockcondition的活用而已。

    3.总结

      整体来说CyclicBarrier的实现相对较简单,说是ReentrantLockcondition的升级版也不为过。其关键点为两个,一个为其构造函数,决定任务个数和唤醒前操作;另外一个点为await方法,在正常情况下每次await都会减少一个任务数(总数由构造方法决定),在任务数变为0的时候表示周期结束,需要唤醒condition的其他线程,而途中遇到失败的话当前周期失败,唤醒其他线程一起抛错。



    失败不会让你变得弱小,害怕失败会。

  • 相关阅读:
    win 10打开administrator
    Navicat
    mkpasswd
    恢复不小心删除的文件
    mysql双主出现1602错误
    scp远程拷贝文件免密办法
    iredmail邮箱服务器部署
    keepalived+nginx后端服务器access_log出现127.0.0.1的访问记录
    lsof 简介
    Codeforces #536 div2 E (1106E)Lunar New Year and Red Envelopes (DP)
  • 原文地址:https://www.cnblogs.com/zhangweicheng/p/12668921.html
Copyright © 2011-2022 走看看