zoukankan      html  css  js  c++  java
  • 回环屏障CyclicBarrier

      上一篇说的CountDownLatch是一个计数器,类似线程的join方法,但是有一个缺陷,就是当计数器的值到达0之后,再调用CountDownLatch的await和countDown方法就会立刻返回,就没有作用了,那么反正是一个计数器,为什么不能重复使用呢?于是就出现了这篇说的CyclicBarrier,它的状态可以被重用;

    一.简单例子

      用法其实和CountDownLatch差不多,也就是一个计数器,当计数器的值变为0之后,就会把阻塞的线程唤醒:

    package com.example.demo.study;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Study0216 {
        // 注意这里的构造器,第一个参数表示计数器初始值
        // 第二个参数表示当计数器的值变为0的时候就触发的任务
        static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
            System.out.println("cyclicBarrier task ");
        });
    
        public static void main(String[] args) {
            // 新建两个线程的线程池
            ExecutorService pool = Executors.newFixedThreadPool(2);
            // 线程1放入线程池中
            pool.submit(() -> {
                try {
                    System.out.println("Thread1----await-begin");
                    cyclicBarrier.await();
                    System.out.println("Thread1----await-end");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            // 线程2放到线程池中
            pool.submit(() -> {
                try {
                    System.out.println("Thread2----await-begin");
                    cyclicBarrier.await();
                    System.out.println("Thread2----await-end");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            // 关闭线程池,此时还在执行的任务会继续执行
            pool.shutdown();
        }
    }

     

       我们再看看CyclicBarrier的复用性,这里比如有一个任务,有三部分组成,分别是A,B,C,然后创建两个线程去执行这个任务,必须要等到两个线程都执行完成A部分,然后才能开始执行B,只有两个线程都执行完成B部分,才能执行C:

    package com.example.demo.study;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Study0216 {
        // 这里的构造器,只有一个参数,表示计数器初始值
        static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    
        public static void main(String[] args) {
            // 新建两个线程的线程池
            ExecutorService pool = Executors.newFixedThreadPool(2);
            // 线程1放入线程池中
            pool.submit(() -> {
                try {
                    System.out.println("Thread1----stepA-start");
                    cyclicBarrier.await();
                    
                    System.out.println("Thread1----stepB-start");
                    cyclicBarrier.await();
                    
                    System.out.println("Thread1----stepC-start");
                    
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            // 线程2放到线程池中
            pool.submit(() -> {
                try {
                    System.out.println("Thread2----stepA-start");
                    cyclicBarrier.await();
                    
                    System.out.println("Thread2----stepB-start");
                    cyclicBarrier.await();
                    
                    System.out.println("Thread2----stepC-start");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            // 关闭线程池,此时还在执行的任务会继续执行
            pool.shutdown();
        }
    }

    二.基本原理

      我们看看一些重要属性:

    public class CyclicBarrier {
       //这个内部类只有一个boolean值
        private static class Generation {
            boolean broken = false;
        }
    
        //独占锁
        private final ReentrantLock lock = new ReentrantLock();
         //条件变量
        private final Condition trip = lock.newCondition();
         //保存线程的总数
        private final int parties;
        //这是一个任务,通过构造器传递一个任务,当计数器变为0之后,就可以执行这个任务
        private final Runnable barrierCommand;
        //这类内部之后一个boolean的值,表示屏障是否被打破
        private Generation generation = new Generation();
        //计数器
        private int count;
    }

      构造器:

    //我们的构造器初始值设置的是parties
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    //注意,这里开始的时候是count等于parties
    //为什么要有两个变量呢?我们每次调用await方法的时候count减一,当count的值变为0之后,怎么又还原成初始值呢?
    //直接就把parties的值赋值给count就行了呀,简单吧!
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

      然后再看看await方法:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //调用的是dowait方法
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    //假设count等于3,有三个线程都在调用这个方法,默认超时时间为0,那么首每次都只有一个线程可以获取锁,将count减一,不为0
    //就会到下面的for循环中扔到条件队列中挂起;直到第三个线程调用这个dowait方法,count减一等于0,那么当前线程执行任务之后,
    //就会唤醒条件变量中阻塞的线程,并重置count为初始值3
    private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {
        //获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //g中只有一个boolean值
            final Generation g = generation;
            //如果g中的值为true的时候,抛错
            if (g.broken)
                throw new BrokenBarrierException();
            //如果当前线程中断,就抛错
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //count减一,再赋值给index
            int index = --count;
            //如果index等于0的时候,说明所有的线程已经到屏障点了,就可以
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //执行当前线程的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒其他因为调用了await方法阻塞的线程
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            //能到这里来,说明是count不等于0,也就是还有的线程没有到屏障点
            for (;;) {
                try {
                    //wait方法有两种情况,一种是设置超时时间,一种是不设置超时时间
                    //这里就是对超时时间进行的一个判断,如果设置的超时时间为0,则会在条件队列中无限的等待下去,直到被唤醒
                    //设置了超时时间,那就等待该时间
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    
    //唤醒其他因为调用了await方法阻塞的线程
    private void nextGeneration() {
        //唤醒条件变量中所有线程
        trip.signalAll();
        //重置count的值
        count = parties;
        generation = new Generation();
    }
    
    private void breakBarrier() {
        generation.broken = true;
        //重置count为初始值parties
        count = parties;
        //唤醒条件队列中的所有线程
        trip.signalAll();
    }
  • 相关阅读:
    MySql设计表中的create_time和update_time字段
    java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
    Hbase问题:java.lang.RuntimeException: HRegionServer Aborted
    Elasticsearch 7.6.2 简单的api(springboot)
    idea + springboot 热部署
    kibana Elasticsearch cluster did not respond with license information.
    Elasticsearch7.6.2 搭建的坑
    数据库账号密码加密
    pg数据库,插入数据,若已存在则更新数据
    org.postgresql.util.PSQLException:这个 ResultSet 已经被关闭。
  • 原文地址:https://www.cnblogs.com/wyq1995/p/12317630.html
Copyright © 2011-2022 走看看