zoukankan      html  css  js  c++  java
  • CyclicBarrier原理分析

    概述

      CyclicBarrier,一个同步辅助类,其作用是让一组线程到达公共屏障点时阻塞,直到最后一个线程也到达公共屏障点,屏障才会打开,而且该屏障(barrier)在这一组线程释放后可以重用,所以称为循环(Cyclic)的屏障(Barrier)。看功能和CountDownLatch一样,但是两者的区别如下(jdk1.8):

    CountDownLatch CyclicBarrier
    减计数方式 减计数方式
    计数为0时释放所有等待的线程 计数为0时,优先执行一个barrierAction,然后释放所有等待线程
    计数为0时,无法重置 计数为0时,可以重置
    调用countDown()方法计数减一,调用await()方法只进行阻塞(主线程),对计数没任何影响 调用await()方法计数减1,若减1后的值不等于0,则线程阻塞(子线程)
    不可重复利用 可重复利用

    上面列举的区别点不清楚的,先不用管,看完后面这个类的介绍之后,就可以明白了,为了更好的理解,先看一个例子。

    使用示例

    铁蛋最近手痒,想找人打架,于是他就到了钢蛋家,钢蛋家兄弟5个,他要去揍他兄弟五个,直到他们兄弟五个都被打趴了,他才会肯收手,这时候他也累了,先歇一会,然后再打,再把钢蛋兄弟五个打趴下,下面就模拟一下这个过程。

    public class CyclicBarrierTest {
       //钢蛋
         static class Person extends Thread{
            CyclicBarrier cyclicBarrier;
            String name;
            public Person(CyclicBarrier cyclicBarrier,String name){
                this.cyclicBarrier = cyclicBarrier;
                this.name = name;
            }
            @Override
            public void run() {
                System.out.println("钢蛋"+name + ":别打了,大哥,我这个月的零发钱都给你");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
         static class Sleep extends Thread{
            @Override
            public void run() {
                System.out.println("打累了,歇会,抽根烟!!!");
            }
        }
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Sleep());
            for (int i = 0; i < 5; i++) {
                new Person(cyclicBarrier,String.valueOf(i)).start();
            }
            try {
    //休息1s,再打 Thread.sleep(
    1000); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 5; i++) { new Person(cyclicBarrier,String.valueOf(i)).start(); } } }

    执行结果

    钢蛋0:别打了,大哥,我这个月的零发钱都给你
    钢蛋4:别打了,大哥,我这个月的零发钱都给你
    钢蛋1:别打了,大哥,我这个月的零发钱都给你
    钢蛋2:别打了,大哥,我这个月的零发钱都给你
    钢蛋3:别打了,大哥,我这个月的零发钱都给你
    打累了,歇会,抽根烟!!!
    钢蛋0:别打了,大哥,我这个月的零发钱都给你
    钢蛋1:别打了,大哥,我这个月的零发钱都给你
    钢蛋2:别打了,大哥,我这个月的零发钱都给你
    钢蛋4:别打了,大哥,我这个月的零发钱都给你
    钢蛋3:别打了,大哥,我这个月的零发钱都给你
    打累了,歇会,抽根烟!!!

    从上面的例子可以看出,铁蛋很能打,哈哈哈。大家看这个例子应该就明白CyclicBarrier怎么使用了。

    CyclicBarrier工作原理

    CyclicBarrier是基于ReentrantLock和Condition实现的,基本过程是这样的,先定义CyclicBarrier拦截的线程数量(parties)和剩余线程数量(count),初始状态parties = count,然后启动多个线程,当某个线程执行完之后,就加锁,然后count减1,如果发现count不等于0,把当前线程await挂起,直到最后一个线程执行完,发现count等于0,这个时候他不着急唤醒其他等待的线程,他先执行一个任务,这个任务是提前就定义好的,当这个任务完成了,他再把其他的线程唤醒,这时这一组线程就都执行完了,之后他会重置拦截的线程数量,这也是为什么CyclicBarrier可以重复使用的原因,以上就是CyclicBarrier处理的基本流程。

    CyclicBarrier代码分析

    构造方法

    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
    public CyclicBarrier(int parties) {
            this(parties, null);
        }
    • parties 变量,表示拦截线程的总数量。

    • count 变量,表示拦截线程的剩余需要数量。

    • barrierAction 变量,为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行 barrierAction ,用于处理更加复杂的业务场景。

    • generation 变量,表示 CyclicBarrier 的更新换代。

    await方法

       //等待,不设置超时时间 
    public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //等待,设置超时间 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }

    设置超时时间的意思是说,当超时时间到了,就唤醒该线程,不管这时所有的线程是不是执行完了。

    进入#dowait(boolean timed, long nanos)方法

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            //加锁
            lock.lock();
            try {
                //获取当前分代
                final Generation g = generation;
                //如果分代被“损坏”,抛出异常,分代怎么能被损坏呢?看下面的代码
                if (g.broken)
                    throw new BrokenBarrierException();
           //如果线程被中断,那分代就被损坏了,这时会重置拦截线程数和唤醒等待线程,防止无限等待下去
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
                //剩余线程数量减1
                int index = --count;
                //index = 0,说明当前线程是最后一个线程
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        //拿到构造方法中传进来的命令
                        final Runnable command = barrierCommand;
                        if (command != null)
                            //执行
                            command.run();
                        ranAction = true;
                        //这里会唤醒别的等待的线程,同时会重置拦截线程数量和新建分代,表示这一组线程完成
                        //开始下一组
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        //如果没有设置了超时时间
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    //抛异常,说明等待的时候该线程被中断了
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            //分代被损坏
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
              //如果分代broken,抛出异常,唤醒之后再次判断,防止上面执行catch
                    if (g.broken)
                        throw new BrokenBarrierException();
              //由于上面在执行到最后一个线程的时候才会唤醒该线程,而且会重置分代,所以这个
                    //if条件会成立,也就是说await方法会返回还剩余多少个等待线程
    
                    if (g != generation)
                        return index;
                    //这个nanos是trip.awaitNanos返回的,如果超时了,这个条件就成立
                    if (timed && nanos <= 0L) {
    //唤醒所有线程,屏障被损坏 breakBarrier();
    throw new TimeoutException(); } } } finally { //最后解锁,上面的所有操作都是原子操作 lock.unlock(); } }

    线程wait之后,如下条件可以结束wait状态:

    1. 最后一个线程执行完,index = 0
    2. 当前线程等待超时
    3. 别的线程等待超时
    4. 等待的过程中当前线程发生中断
    5. 等待过程别的线程发生中断
    6. 某个线程执行了reset方法

    Generation

    private static class Generation {
            boolean broken = false;
        }
    private Generation generation = new Generation();

    Generation是CyclicBarrier中的一个静态类,首次初始化CyclicBarrier的时候,会自动初始化,该类是处理分代的,同一组的线程属于一个分代,当这一组的所有线程都到达barrier的时候,就重新new一个新的,可以看一下如下方法:

    nextGeneration();

    该方法在index=0的时候会执行,代码如下:

        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
    //当index=0的时候,说明这一组线程全部执行完了,重新新建分代对象 generation
    = new Generation(); }

    也就是说每次分代都会重新新建一个Generation类的对象。

    reset()方法

       public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }

    该方法会重置分代,而且会通知上一个分代的所有等待的线程。这里有一个问题,如果在执行reset的时候,上一个分代中还有的线程没有执行到await方法,在reset之后,上一个分代中这些方法又进入await,这个时候在breakBarrier方法中把分代给损坏了是不管用的,先看一下这个方法

        private void breakBarrier() {
    //“损坏分代方法” generation.broken
    = true; count = parties; trip.signalAll(); }

    在这个方法中设置为true之后,接着又执行了nextGeneration,在这个方法中会新建一个分代,此时generation.broken = false,所以上一个分代的线程会继续执行,而且会进入await,会使用reset之后的分代。

    参考:

    Java并发—— CountDownLatch与CyclicBarrier

    【死磕 Java 并发】—- J.U.C 之并发工具类:CyclicBarrier

  • 相关阅读:
    剑指Offer_#7_重建二叉树
    剑指Offer_#6_从尾到头打印链表
    剑指Offer_#5_替换空格
    剑指Offer_#4_二维数组中的查找
    Redis详解(十三)- Redis布隆过滤器
    Redis详解(十二)- 缓存穿透、缓存击穿、缓存雪崩
    Redis详解(十一)- 过期删除策略和内存淘汰策略
    Redis详解(十)- 集群模式详解
    Redis详解(九)- 哨兵(Sentinel)模式详解
    Redis详解(八)- 主从复制
  • 原文地址:https://www.cnblogs.com/gunduzi/p/13637891.html
Copyright © 2011-2022 走看看