zoukankan      html  css  js  c++  java
  • 【Java并发】- 6.对并发工具类CyclicBarrier的源码解析

    1.CyclicBarrier类的简介

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    示例:

    public class CyclicBarrierDemo {
    
        public static void main(String[] args) {
            CyclicBarrier barrier = new CyclicBarrier(3);
    
            for (int i = 0; i < 3; i++) {
                int finalI = i;
                new Thread(() -> {
                    try {
                        Thread.sleep((long)( 2000*Math.random()));
                        int random = new Random().nextInt(200);
                        System.out.println("hello:" + random);
                        barrier.await();
                        System.out.println("word:" + random);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    

    执行输出

    hello:74
    hello:4
    hello:95
    word:95
    word:74
    word:4
    

    上面代码输出是可以观察到前两个hello先输出,等待一段时间后最后一个hello才会输出,并且把3个word一起输出。

    故从这就可以看出,当我们在CyclicBarrier是一个线程屏障,在构造方法中设置了等待线程个数parties后,我们在线程中使用await();方法那么线程就会进入等待状态直到有parties个线程都执行到了await()才会把所有线程唤醒,继续执行

    如果如果把new CyclicBarrier(3)修改成new CyclicBarrier(4),则主线程和子线程会永远等待,因为没有第四个线程执行await方法,即没有第四个线程到达屏障,所以之前到达屏障的三个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景

    public class CyclicBarrierDemo {
    
        public static void main(String[] args) {
            CyclicBarrier barrier = new CyclicBarrier(3,() -> {
                System.out.println("barrier used success");
            });
    
            for (int j = 0; j < 2; j++) {
                for (int i = 0; i < 3; i++) {
                    int finalI = i;
                    new Thread(() -> {
                        try {
                            Thread.sleep((long)( 2000*Math.random()));
                            int random = new Random().nextInt(200);
                            System.out.println("hello:" + random);
                            barrier.await();
                            System.out.println("word:" + random);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }).start();
                }
            }
        }
    }
    

    运行结果:

    hello:33
    hello:194
    hello:187
    barrier used success
    word:187
    word:33
    word:194
    hello:193
    hello:161
    hello:32
    barrier used success
    word:32
    word:193
    word:161
    

    Runnable barrierAction使用Lambda 表达式,且多次进行三个线程的拦截。

    2.对CyclicBarier源码的分析

    类中方法的有
    在这里插入图片描述

    对构造方法的解析

    public CyclicBarrier(int parties) {
    //可以看到,一个参数的构造方法是由两个参数的构造方法实现,故对
    //两个参数的构造方法进行分析
            this(parties, null);
        }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
    //检验参数是否合法
            if (parties <= 0) throw new IllegalArgumentException();
            //将parties赋值给parties和count,这就是为什么
            //CyclicBarrier可以多次拦截parties个线程的原因
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
    /** The number of parties */
    //可拦截线程的个数
        private final int parties;
    /**
         * Number of parties still waiting. Counts down from parties to 0
         * on each generation.  It is reset to parties on each new
         * generation or when broken.
         */
         //计数器和CountDownLatch中的计数器作用类似,在变为0后唤醒所有await的线程
         //并且通过parties重新获取值
        private int count;
    /* The command to run when tripped */
    //在每次所有线程被唤醒后执行的动作
        private final Runnable barrierCommand;
        
    /**
         * Each use of the barrier is represented as a generation instance.
         * The generation changes whenever the barrier is tripped, or
         * is reset. There can be many generations associated with threads
         * using the barrier - due to the non-deterministic way the lock
         * may be allocated to waiting threads - but only one of these
         * can be active at a time (the one to which {@code count} applies)
         * and all the rest are either broken or tripped.
         * There need not be an active generation if there has been a break
         * but no subsequent reset.
         */
         上面说过CyclicBarrier可以进行多次拦截,这个参数是对每次拦截进行分代。
        private static class Generation {
            boolean broken = false;
        }
    

    啥都写的注释中了,自己看

    对await()方法的解析

    CyclicBarrier中有两个await方法

    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    

    可以看到两个await方法都调用了dowait,故对dowait做主要分析

    dowait()方法

    /**
         * Main barrier code, covering the various policies.
         */
         //主要屏障实现代码
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            //获取一个重入锁,在类中以定义(这些参数在方法下面给出)
            final ReentrantLock lock = this.lock;
            //执行锁,说明下面的代码都是同步的
            lock.lock();
            try {
            //获取分代,进行异常判断
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
    //这是对计数器减一
                int index = --count;
                //这是最后一个线程执行await计数器变为0时的情况
                if (index == 0) {  // tripped
                	//这是一个标记说明在构造方法中传入的Runnable是否执行
                    boolean ranAction = false;
                    try {
                    //获取传入的Runnable
                        final Runnable command = barrierCommand;
                        //不为null,说明构造方法传入Runnable对象
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //开启下一个分代
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                        //打破屏障
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //这是计数器自减后不为0的情况
                for (;;) {
                    try {
                    //判断是否使用时间参数,不同的await方法
                        if (!timed)
                        //没使用,就直接把线程放入Codition的等待集合中
                            trip.await();
                        else if (nanos > 0L)
                        //如果使用了时间参数,就使用带时间的方法把线程放入等待集合
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                        //发生一次打破屏障唤醒使用线程
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    //一些检错判断
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
            解锁
                lock.unlock();
            }
        }
    /** The lock for guarding barrier entry */
    //锁对象
    private final ReentrantLock lock = new ReentrantLock();
    
    /** The current generation */
    //分代器
    private Generation generation = new Generation();
    
    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        //唤醒所有等待线程,trip是类中定义的一个Condition对象,由
        //类中的锁直接生成
        trip.signalAll();
        // set up next generation
        //重置计数器的值
        count = parties;
        //开启一个新的分代
        generation = new Generation();
    }
    
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    
    //该方法的存在是为了防止try中代码执行异常导致不能正常进行唤醒所有等待线程
    private void breakBarrier() {
    //把分代值值为true
        generation.broken = true;
        //重置计数器的值
        count = parties;
        trip.signalAll();
    }
    

    该方法作用就

    • 如果执行的是最后一个线程就唤醒使用的等待集合中的线程
    • 如果不是最后一个线程,就把线程放入等待集合

    关于CyclicBarrier的底层执行流程总结

    1. 初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选)
    2. 当调用await方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;
    3. 在下一个分代中,将会重置count值为parties,并且创建新的Generation实例。
    4. 同时会调用Condition的signalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行。
    5. 如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待。
    6. 以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。

    3.CyclicBarrier和CountDownLatch的区别

    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

    注意:和CountDownLatch类不同CyclicBarrier的parties不会因为线程都执行await就减一,而是在执行过程中不变,即CyclicBarrier可以多次拦截parties个线程,而CountDownLatch只能一个拦截计数器个线程。

    在这里插入图片描述

  • 相关阅读:
    ASPNETDB 数据库关系图、表和视图 基本表和独立表(转载)
    Android工作学习笔记之图片自适应imageview属性android:scaleType
    Android 为不同的语言和硬件创建资源
    asp.net mvc 将Enum绑定在DropDownList了
    java异常处理的throw和throws的区别
    media=screen是什么意思 有什么用?
    javascript 匿名函数
    CharSequence类型
    android使用系统资源,链接当前主题中的Style
    Android中this.*与*.this还有*.class的区别是什么?
  • 原文地址:https://www.cnblogs.com/wf614/p/13168590.html
Copyright © 2011-2022 走看看