zoukankan      html  css  js  c++  java
  • Java CyclicBarrier

    概述


    1.CyclicBarrier介绍

    2.CyclicBarrier源码分析

    3.CyclicBarrier示例

    CyclicBarrier介绍

    CyclicBarrier翻译过来也叫栅栏,意思很明显,就是一组线程相互等待,均到达栅栏的时候,再运行。CyclicBarrier是可以重复使用的,而之前的CountDownLatch是一次性的。CyclicBarrier允许一组线程相互等待,直到到达某个公共屏障点,屏障点即一组任务执行完毕的时候。

    与CountDownLatch的区别:

      1.CyclicBarrier是可重复使用的,CountDownLatch是一次性的。

      2.使用场景,CountDownLatch用于不同线程之间的等待,比如主线程需要等待子线程执行完毕。而CyclicBarrier用于一组线程内的相互等待,比如5个线程到底某种状态才执行。

    CyclicBarrier的应用场景,举个生活相关的例子。 比如跑步比赛,所有人必须到底起跑点才能开始比赛,比赛开始后,所有人各自奔跑,需要等待所有人到底终点后才能结束比赛。

    CyclicBarrier是包含了"ReentrantLock"和"Condition对象trip",它是通过独占锁实现的。

    /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        /** Condition to wait on until tripped */
        private final Condition trip = lock.newCondition();

    CyclicBarrier源码分析

    首先使用的时候,我们会初始化一个CyclicBarrier对象

     public CyclicBarrier(int parties, Runnable barrierAction) {  //这里运行添加一个Runnable任务,用于到底屏障点执行
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and
         * does not perform a predefined action when the barrier is tripped.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
        public CyclicBarrier(int parties) {  //parties代表在栅栏放开之前,parties个数量的线程必须被调用
            this(parties, null);
        }

    这时每个线程调用的时候,会调用await()方法

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L); //第一个参数代表是否有超时时间,第二个参数代表超时时间长
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;   //调用Lock,CyclicBarrier并不能实现同步,所有需要通过lock来保证线程安全
            lock.lock();
            try {
                final Generation g = generation;  //当前的generation,generation的数据结构见下面
    
                if (g.broken)  //如果已损坏,则抛出异常
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {  //如果线程被中断
                    breakBarrier();  //方法说明见下面,因为一旦线程被中断了,就没办法实现屏障点
                    throw new InterruptedException();
                }
    
                int index = --count;  //将计算器parties减1
                if (index == 0) {  // tripped  //如果到底屏障点
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;  //获取要执行的Runnable任务
                        if (command != null)
                            command.run();  //如果不为空,则执行任务
                        ranAction = true;
                        nextGeneration(); //更新generation及重置屏障点,唤醒线程
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();  //重置屏障点,唤醒线程
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out,未到底屏障点
    for (;;) { try { if (!timed) //如果没有设置超时时间,则使用condition将线程等待,为什么使用condition? 因为所有的await都是等待一个条件,即parties变为0 trip.await(); else if (nanos > 0L) //否则设置超时时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //异常处理 if (g == generation && ! g.broken) { //如果generation没有换代,且没有broken,则调用breakBarrier终止CyclicBarrier breakBarrier(); throw ie; } else { //否则中断当前线程 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) //当前generation 已经损坏 throw new BrokenBarrierException(); if (g != generation) //generation已经换代,返回index数 return index; if (timed && nanos <= 0L) { //等待超时 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } private static class Generation { //同一批线程属于同一个generationboolean broken = false;
        }

    CyclicBarrier的特点就是要么大家都执行完毕,要么大家都被异常中断,不会其中一个有中断而其他正常执行完毕的现象存在。有点像原子的概念,当然不是一回事。
    所以这样的需要有一个状态来描述曾经是否有线程被中断过,这样后面执行的线程就知道是否需要继续等待了。Generation就是为了完成这个事的。
    而多个线程同时竞争Generation,竞争CyclicBarrier的index,这样就需要通过lock来保证安全。
    private void breakBarrier() {
            generation.broken = true;  //将generation职位True,代表generation已经损坏
            count = parties;  //重启count数为parties
            trip.signalAll();  //这里使用的condition的singalAll唤醒所有线程
        }
    private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();  //产生一个新的generation
        }
     

    CyclicBarrier示例

    场景说明,5个跑步者,等待所有人到齐后开始跑步,有跑的快的,有跑的慢的,这里用sleep来替代。 跑步完成后,一块去喝酒,喝酒必须等待所有人到齐后再开始喝,也是有人喝的快,有人喝的慢。 喝完酒后每个人say good bye。 所有人均sayGoodBye后,然后宣布散场。

    这里使用了CyclicBarrier和CountDownLatch,从使用里面可以看出二者的主要区别。 CyclicBarrier主要用于5个人之间相互等待。 而CountDownLatch用于5个人与主线程的等待。

    public class CyclicBarrierTest1 {
    
        private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);  //定义一个CyclicBarrier,可以重复使用
        private static CountDownLatch countDownLatch=new CountDownLatch(5);  //定义一个CountDownLatch,用于最后每个人say bye后打印,bye bye。
    
        public static void main(String[] args){
    
            for(int i=1;i<=5;i++){
    
                new RunningMan("name"+i,Long.valueOf(i*1000),cyclicBarrier,countDownLatch).start();
            }
            try {
                countDownLatch.await();  //主线程等待最后bye bye
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ByeBye");
    
        }
    }
    
    class RunningMan extends Thread{
    
        String name;
        long speed;
        CyclicBarrier cyclicBarrier;
        CountDownLatch countDownLatch;
        public RunningMan(String name,long speed,CyclicBarrier cyclicBarrier,CountDownLatch countDownLatch){
    
            this.name=name;
            this.speed=speed;
            this.cyclicBarrier=cyclicBarrier;
            this.countDownLatch=countDownLatch;
        }
        public void running(){  //奔跑
    
            System.out.println("Waiting "+name);  
            try {
                int index=cyclicBarrier.await(); //所有人开始等待人到期,最后一个人到期后就开始跑了
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("Running "+speed);
            try {
                Thread.sleep(speed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void celebrate(){  //庆祝一下
    
            System.out.println("Celebrate man comming "+name);  
            try {
                int index=cyclicBarrier.await();  //等待人到场后开始庆祝喝酒
    //            System.out.println("index is "+index);
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("Drinking "+speed);
            try {
                Thread.sleep(speed);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void sayGoodBy(){  //say good bye
    
            try {
                System.out.println(name+":say good bye");
    
                int index=cyclicBarrier.await();  //等待每个人say good bye
                countDownLatch.countDown();  //CountDownLatch减1
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
    
    
        }
    
        public void run(){
    
            running();
            celebrate();
            sayGoodBy();
        }
    }

    输出结果为:

    Waiting name1
    Waiting name3
    Waiting name4
    Waiting name2
    Waiting name5
    Running 5000
    Running 1000
    Running 3000
    Running 4000
    Running 2000
    Celebrate man comming name1
    Celebrate man comming name2
    Celebrate man comming name3
    Celebrate man comming name4
    Celebrate man comming name5
    Drinking 5000
    Drinking 2000
    Drinking 4000
    Drinking 1000
    Drinking 3000
    name1:say good bye
    name2:say good bye
    name3:say good bye
    name4:say good bye
    name5:say good bye
    ByeBye

  • 相关阅读:
    JDBC 查询的三大参数 setFetchSize prepareStatement(String sql, int resultSetType, int resultSetConcur)
    有空必看
    SpringMVC 利用AbstractRoutingDataSource实现动态数据源切换
    FusionCharts JavaScript API Column 3D Chart
    FusionCharts JavaScript API
    FusionCharts JavaScript API
    Extjs 继承Ext.Component自定义组件
    eclipse 彻底修改复制后的项目名称
    spring 转换器和格式化
    Eclipse快速生成一个JavaBean类的方法
  • 原文地址:https://www.cnblogs.com/dpains/p/7525397.html
Copyright © 2011-2022 走看看