zoukankan      html  css  js  c++  java
  • Java并发编程系列之CyclicBarrier详解

    简介

    jdk原文

    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. 
    CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
    The barrier is called cyclic because it can be re-used after the waiting threads are released.
    

    这句话翻译意思:CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
    在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮助。
    这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的

    抓住重点:1、允许一组线程相互等待直到达到一个公共屏障点,2、可以重复使用

    简单举例就是:玩王者荣耀只有所有人进入游戏之前都必须加载到100%,所有人才能进入游戏。
    与CountDownLatch比较

     
    image.png

    源码解析

    先从构造方法入手

      /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and
         * does not perform a predefined action when the barrier is tripped.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
     public CyclicBarrier(int parties) {
            this(parties, null);
        }
       /**
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and which
         * will execute the given barrier action when the barrier is tripped,
         * performed by the last thread entering the barrier.
         *
         * @param parties the number of threads that must invoke {@link #await}
         *        before the barrier is tripped
         * @param barrierAction the command to execute when the barrier is
         *        tripped, or {@code null} if there is no action
         * @throws IllegalArgumentException if {@code parties} is less than 1
         */
    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    

    从jdk注释我们可以看出:
    第一个构造器:创建一个新的{@code CyclicBarrier},它会在
    给定数量的屏障(线程)正在等待它,并且在屏障被触发时不执行预定义的操作。
    第二个构造器:创建一个新的{@code CyclicBarrier},它会在
    给定数量的屏障(线程)正在等待它,以及当屏障被触发时,优先执行barrierAction,方便处理更复杂的业务场景。


    await()方法
    调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:
    await()方法里面最主要就是doawait()

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
                TimeoutException {
        // 获取独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 当前先从
            final Generation g = generation;
            // 如果这个线程损坏了,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
     
            // 如果线程中断了,抛出异常
            if (Thread.interrupted()) {
                // 将损坏状态设置为true
                // 并通知其他阻塞在此屏障上的线程
                breakBarrier();
                throw new InterruptedException();
            }
     
            // 获取下标
            int index = --count;
            // 如果是 0,说明最后一个线程调用了该方法
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 执行屏障任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 更新一代,将count重置,将generation重置
                    // 唤醒之前等待的线程
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果执行屏障任务的时候失败了,就将损坏状态设置为true
                    if (!ranAction)
                        breakBarrier();
                }
            }
     
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                     // 如果没有时间限制,则直接等待,直到被唤醒
                    if (!timed)
                        trip.await();
                    // 如果有时间限制,则等待指定时间
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 当前线程没有损坏
                    if (g == generation && ! g.broken) {
                        // 让屏障失效
                        breakBarrier();
                        throw ie;
                    } else {
                        // 上面条件不满足,说明这个线程不是这代的
                        // 就不会影响当前这代屏障的执行,所以,就打个中断标记
                        Thread.currentThread().interrupt();
                    }
                }
     
                // 当有任何一个线程中断了,就会调用breakBarrier方法
                // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
     
                // g != generation表示正常换代了,返回当前线程所在屏障的下标
                // 如果 g == generation,说明还没有换代,那为什么会醒了?
                // 因为一个线程可以使用多个屏障,当别的屏障唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
                // 正是因为这个原因,才需要generation来保证正确。
                if (g != generation)
                    return index;
                
                // 如果有时间限制,且时间小于等于0,销毁屏障并抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放独占锁
            lock.unlock();
        }
       }
    

    总结如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:
    最后一个线程到达,即index == 0
    某个参与线程等待超时
    某个参与线程被中断
    调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态

    Generation描述着CyclicBarrier的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

    默认barrier(屏障)是没有损坏的。当barrier(屏障)损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

    private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    

    breakBarrier()不仅会把broken设置为true,还会将所有处于等待状态的线程全部唤醒(singalAll)方法

    注意CyclicBarrier使用独占锁来执行await方法,并发性可能不是很高

    简单例子加深印象

    /**
     * @author shuliangzhao
     * @Title: CyclicBarrierTest
     * @ProjectName design-parent
     * @Description: TODO
     * @date 2019/6/3 0:23
     */
    public class CyclicBarrierTest {
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
    
            for(int i=0;i<N;i++) {
                new Writer(barrier).start();
            }
    
           
           /* try {
                Thread.sleep(25000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("CyclicBarrier重用");
    
            for(int i=0;i<N;i++) {
                new Writer(barrier).start();
            }*/
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
                try {
                    Thread.sleep(2000);      //以睡眠来模拟写入数据操作
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
    
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
            }
        }
    }
    
    

    运行结果

    image.png

    怎么用多线程求和

    
    /**
     * @author shuliangzhao
     * @Title: CyclicBarrier
     * @ProjectName design-parent
     * @Description: TODO
     * @date 2019/6/3 0:18
     */
    public class CyclicBarrierExc {
    
        //private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierExc.class);
    
        public static void main(String[] args) {
            //数组大小
            int size = 50000;
            //定义数组
            int[] numbers = new int[size];
    
            //随机初始化数组
            for (int i = 0; i < size; i++) {
                numbers[i] = RandomUtils.nextInt(100, 1000);
            }
    
            //多线程计算结果
            //定义线程池
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            //定义五个Future去保存子数组计算结果
            final int[] results = new int[5];
    
            //定义一个循环屏障,在屏障线程中进行计算结果合并
            CyclicBarrier barrier = new CyclicBarrier(5, () -> {
                int sums = 0;
                for (int i = 0; i < 5; i++) {
                    sums += results[i];
                }
                System.out.println("多线程计算结果:" + sums);
            });
    
            //子数组长度
            int length = 10000;
            //定义五个线程去计算
            for (int i = 0; i < 5; i++) {
                //定义子数组
                int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length));
                //盛放计算结果
                int finalI = i;
                executorService.submit(() -> {
                    for (int j = 0; j < subNumbers.length; j++) {
                        results[finalI] += subNumbers[j];
                    }
                    //等待其他线程进行计算
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
    
            //关闭线程池
            executorService.shutdown();
        }
    }
  • 相关阅读:
    轮播图适应代码jQ
    jQuery focus、blur事件 添加、删除类名
    节点操作js jQuery
    动态加载jQuery
    底边滑动变色的列表
    节点选择只有链接
    第三方登录过程—OAuth2.0协议
    JavaScript中常谈的对象
    浅谈JavaSccript函数与对象
    JavaScript与DOM的关系
  • 原文地址:https://www.cnblogs.com/treeshu/p/10976256.html
Copyright © 2011-2022 走看看