zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(二十、CyclicBarrier源码分析)

    目录:

    • 什么是CyclicBarrier
    • 为什么要有CyclicBarrier
    • 如何使用CyclicBarrier
    • CyclicBarrier源码解析

    什么是CyclicBarrier

    CyclicBarrier是循环栅栏,它的作用就是会让所有线程都等待完成后才会继续下一步行动

    与CountDownLatch不同的是CountDownLatch是一次性的,而CyclicBarrier可以循环使用

    为什么要有CyclicBarrier

    CyclicBarrier可以说是CountDownLatch的扩展,而我们之前了解到了的CountDownLatch又是join()的扩展;然后我们知道join()只能够在线程执行完成后才可以释放锁,而CountDownLatch更为灵活些,可以在线程执行到某一阶段释放锁。

    那CyclicBarrier扩展了一些什么呢,实际上他们的主要区别就是CountDownLatch会在线程执行完后就完了,只一次性的,而CyclicBarrier会在线程执行完后看是否满足结束条件,若不满足会一直循环的执行,可循环使用的,直到满足后才会退出。也就是说CyclicBarrier是对CountDownLatch执行能力的一次升级。

    如何使用CyclicBarrier

    根据上述说的,CountDownLatch是一次性的,CyclicBarrier是循环使用的,那CyclicBarrier的使用场景到底是什么呢。

    我这里举个例子,学校举行运动会,项目为百米赛跑,到达重点结束这个项目。现有3名参赛人员,和一个裁判,规则是每个运动员跑一秒,如果你先跑,跑完后就需要等待其它运动员也跑完一秒后才能继续抛(当然, 实际不会有这个操作),谁先到达重点就胜利。

    代码如下:

     1 /**
     2  * 运动员
     3  *
     4  * @author zhoude
     5  * @date 2020/6/28 12:37
     6  */
     7 public class Athlete extends Thread {
     8 
     9     /**
    10      * 运动员姓名
    11      */
    12     private String athleteName;
    13 
    14     /**
    15      * 已跑距离
    16      */
    17     private int distance;
    18 
    19     /**
    20      * 随机跑的距离
    21      */
    22     private Random random = new Random();
    23 
    24     /**
    25      * 循环栅栏
    26      */
    27     private CyclicBarrier cyclicBarrier;
    28 
    29     public Athlete(String athleteName, CyclicBarrier cyclicBarrier) {
    30         this.athleteName = athleteName;
    31         this.cyclicBarrier = cyclicBarrier;
    32     }
    33 
    34     @Override
    35     public void run() {
    36         System.out.println(MessageFormat.format("{0}执行了run方法", this.athleteName));
    37         try {
    38             while (!Thread.interrupted()) {
    39                 // 每隔1秒随机跑0-10米
    40                 synchronized (this) {
    41                     this.distance += random.nextInt(10);
    42                 }
    43                 TimeUnit.SECONDS.sleep(1);
    44                 System.out.println(MessageFormat.format("{0}已经跑了{1}米", this.athleteName, this.distance));
    45                 // 等待其它运动员跑完这一轮(理论上不可能,哈哈)
    46                 cyclicBarrier.await();
    47             }
    48         }
    49         catch (InterruptedException | BrokenBarrierException e) {
    50             e.printStackTrace();
    51         }
    52     }
    53 
    54     //  getter and setter
    55 
    56     public String getAthleteName() {
    57         return athleteName;
    58     }
    59 
    60     public void setAthleteName(String athleteName) {
    61         this.athleteName = athleteName;
    62     }
    63 
    64     public int getDistance() {
    65         return distance;
    66     }
    67 
    68 }
     1 public class Referee extends Thread {
     2 
     3     /**
     4      * 赛跑距离
     5      */
     6     private static final int DISTANCE = 100;
     7 
     8     /**
     9      * 运动员
    10      */
    11     private List<Athlete> athletes = new ArrayList<>();
    12 
    13     /**
    14      * 线程池
    15      */
    16     private ExecutorService pool;
    17 
    18     public Referee(ExecutorService pool) {
    19         this.pool = pool;
    20     }
    21 
    22     @Override
    23     public void run() {
    24         System.out.println("裁判判断了一次");
    25         athletes.forEach(athlete -> {
    26             if (athlete.getDistance() >= DISTANCE) {
    27                 System.out.println(MessageFormat.format("{0}已经到达终点", athlete.getAthleteName()));
    28                 pool.shutdownNow();
    29                 return;
    30             }
    31         });
    32     }
    33 
    34     public void addAthletes(Athlete athlete) {
    35         athletes.add(athlete);
    36     }
    37 
    38 }
     1 public class Test {
     2 
     3     public static void main(String[] args) {
     4         int size = 3;
     5         ExecutorService pool = Executors.newCachedThreadPool();
     6         Referee referee = new Referee(pool);
     7         CyclicBarrier cyclicBarrier = new CyclicBarrier(size, referee);
     8         for (int i = 0; i < size; i++) {
     9             Athlete athlete = new Athlete("运动员" + (i + 1), cyclicBarrier);
    10             referee.addAthletes(athlete);
    11             pool.submit(athlete);
    12         }
    13     }
    14 
    15 }

    当运行Test后,你会发现咦,刚刚不是说CyclicBarrier会循环使用嘛,不是应该等3名运动员跑完一轮后继续调用run方法嘛,其实不是的,这块等说完CyclicBarrier源码后你就清楚了。

    CyclicBarrier源码解析

    好,现在我们来通过源码分析来看看上面说的,循环使用到底是如何循环的。

    还是一个套路,先看下CyclicBarrier的属性、内部类以及构造器,然后学习下核心方法。

    1、属性:

     1 /** 保护CyclicBarrier对象入口的重入锁 */
     2 private final ReentrantLock lock = new ReentrantLock();
     3 /** 用于线程间等待和唤醒的Condition */
     4 private final Condition trip = lock.newCondition();
     5 /** 栅栏拦截的线程数量 */
     6 private final int parties;
     7 /** 达到拦截线程数后执行的任务 */
     8 private final Runnable barrierCommand;
     9 /** 当前的Generation,每当屏障失效或开闸后会自动替换掉,从而实现重置的功能(循环利用) */
    10 private Generation generation = new Generation();
    11 
    12 /**
    13  * 还能阻塞的线程数(即parties-当前阻塞线程数)
    14  */
    15 private int count;

    从属性可以看出CyclicBarrier是通过ReentrantLock和Condition来实现了,与CountDownLatch相似的是,它们同意定义了消费的数量这种类似的int属性,当这个属性满足特定条件后则唤醒。

    还有一个属性Generation,需要特别关注下,这是实现循环栅栏的核心,稍后会说到。

    2、构造器:

     1 public CyclicBarrier(int parties, Runnable barrierAction) {
     2     if (parties <= 0) throw new IllegalArgumentException();
     3     this.parties = parties;
     4     this.count = parties;
     5     this.barrierCommand = barrierAction;
     6 }
     7 
     8 public CyclicBarrier(int parties) {
     9     this(parties, null);
    10 }

    两个构造器差不多,只是带有Runnable可以处理一些特别的业务场景。

    3、内部类:

    1 /**
    2  * 每使用CyclicBarrier对象都会关联一个Generation对象。
    3  * 当CyclicBarrier对象发生trip或reset时,对应的generation会发生改变。
    4  */
    5 private static class Generation {
    6     // 标记当前CyclicBarrier对象是否已经处于中断状态
    7     boolean broken = false;
    8 }

    4、核心方法:

    await():await方法告诉CyclicBarrier自己已经到达同步点了,然后再阻塞当前线程。

    await有两个,一个是带超时时间的,一个是不带的:

    • java.util.concurrent.CyclicBarrier#await()
    • java.util.concurrent.CyclicBarrier#await(long, java.util.concurrent.TimeUnit)

    它们两个最终都会走到java.util.concurrent.CyclicBarrier#dowait,所以我们只要说说dowait就可以了。

     1 private int dowait(boolean timed, long nanos)
     2     throws InterruptedException, BrokenBarrierException,
     3            TimeoutException {
     4     // 获取重入锁
     5     final ReentrantLock lock = this.lock;
     6     // 重入锁加锁
     7     lock.lock();
     8     try {
     9         final Generation g = generation;
    10 
    11         // 若generation已损坏,则抛出异常
    12         if (g.broken)
    13             throw new BrokenBarrierException();
    14 
    15         // 线程中断,设置generation已损坏,并重置count数量(之后详述)
    16         if (Thread.interrupted()) {
    17             breakBarrier();
    18             throw new InterruptedException();
    19         }
    20 
    21         // 每次等待都会减少还能阻塞的线程数量
    22         int index = --count;
    23         // index == 0说明是最后一个进入栅栏的线程
    24         if (index == 0) {  // tripped
    25             // 任务被执行的标识
    26             boolean ranAction = false;
    27             try {
    28                 final Runnable command = barrierCommand;
    29                 // 若有任务执行则执行任务
    30                 if (command != null)
    31                     command.run();
    32                 // 任务执行完后,设置ranAction为已执行
    33                 ranAction = true;
    34                 // 更新栅栏状态,并唤醒所有线程
    35                 nextGeneration();
    36                 return 0;
    37             } finally {
    38                 // 若执行栅栏任务时失败了,则标记栅栏状态为中断,并唤醒所有线程
    39                 if (!ranAction)
    40                     breakBarrier();
    41             }
    42         }
    43 
    44         // loop until tripped, broken, interrupted, or timed out
    45         for (;;) {
    46             try {
    47                 // 没有时间限制,则一直等待,直到被唤醒
    48                 if (!timed)
    49                     trip.await();
    50                 // 设置了超时时间,则等待
    51                 else if (nanos > 0L)
    52                     nanos = trip.awaitNanos(nanos);
    53             } catch (InterruptedException ie) {
    54                 // 若在等待时间内,被中断了
    55                 if (g == generation && ! g.broken) {
    56                     // 当前generation未被损坏,则通知其它阻塞在此栅栏上的线程
    57                     breakBarrier();
    58                     throw ie;
    59                 } else {
    60                     // 其它情况,说明并不是这个generation,不会影响这个generation的执行
    61                     // 只需要打个中断标记即可
    62                     Thread.currentThread().interrupt();
    63                 }
    64             }
    65 
    66             if (g.broken)
    67                 throw new BrokenBarrierException();
    68 
    69             // 若换了generation则返回当前栅栏下标。
    70             // 因为一个线程可以使用多个栅栏,所以也会存在被唤醒后,g == generation的情况
    71             if (g != generation)
    72                 return index;
    73 
    74             // 时间到了还未被唤醒,则自动唤醒所有线程,并抛出异常
    75             if (timed && nanos <= 0L) {
    76                 breakBarrier();
    77                 throw new TimeoutException();
    78             }
    79         }
    80     } finally {
    81         // 解锁
    82         lock.unlock();
    83     }
    84 }

    上述dowait()方法中有两个关键的函数,breakBarrier()、nextGeneration(),他们用于发生异常情况损坏generation和换generation的操作。

    其源码如下:

    1 private void breakBarrier() {
    2     generation.broken = true;
    3     count = parties;
    4     trip.signalAll();
    5 }
    1 private void nextGeneration() {
    2     // signal completion of last generation
    3     trip.signalAll();
    4     // set up next generation
    5     count = parties;
    6     generation = new Generation();
    7 }

    可以看到他们也不过是唤醒线程及标记一些属性而已,然后我们再来结合dowait()方法及我们之前的赛跑demo来分析下。

    ——————————————————————————————————————————————————————————————————————

    breakBarrier(),它把generation.broken置为true,并且重置还能阻塞的线程数count为初始值,以及唤醒所有线程。

    因为generation.broken = true后,再次调用dowait()方法也就会抛出异常,会终止dowait()方法的执行;然后是并初始化count,保证使用者在捕获异常后能自行处理;以及唤醒线程不让线程阻塞。

    ——————————————————————————————————————————————————————————————————————

    nextGeneration(),先唤醒,再重置count数量,最后构造一个新的generation 。

    这样其实就是重置了栅栏,我们想想,首先是满足我们栅栏的条件后,就会调用dowait()方法阻塞线程,栅栏拦截数量到达指定数后就不能再拦截了,所以在调用nextGeneration()方法重置栅栏后,数量变成初始值,这样我们的栅栏就又可以继续拦截线程了。

    ——————————————————————————————————————————————————————————————————————

    最后我们根据我们上面的demo来分析下,CyclicBarrier的运行流程。

    首先我们创建了一个需要拦截3个线程的栅栏,并且调用pool.submit(athlete)开始执行。

    然后线程会走到Athlete的run方法中,并开始执行线程。

     1 @Override
     2 public void run() {
     3     System.out.println(MessageFormat.format("{0}执行了run方法", this.athleteName));
     4     try {
     5         while (!Thread.interrupted()) {
     6             // 每隔1秒随机跑0-10米
     7             synchronized (this) {
     8                 this.distance += random.nextInt(10);
     9             }
    10             TimeUnit.SECONDS.sleep(1);
    11             System.out.println(MessageFormat.format("{0}已经跑了{1}米", this.athleteName, this.distance));
    12             // 等待其它运动员跑完这一轮(理论上不可能,哈哈)
    13             cyclicBarrier.await();
    14         }
    15     }
    16     catch (InterruptedException | BrokenBarrierException e) {
    17         e.printStackTrace();
    18     }
    19 }

    每个线程走到第13行就会等待其它线程。

     1 if (index == 0) {  // tripped
     2     boolean ranAction = false;
     3     try {
     4         final Runnable command = barrierCommand;
     5         if (command != null)
     6             command.run();
     7         ranAction = true;
     8         nextGeneration();
     9         return 0;
    10     } finally {
    11         if (!ranAction)
    12             breakBarrier();
    13     }
    14 }

    我们可以看到当最后一个线程调用await()方法后,index == 0,会调用command.run(),也就是如下代码:

     1 @Override
     2 public void run() {
     3     System.out.println("裁判判断了一次");
     4     athletes.forEach(athlete -> {
     5         if (athlete.getDistance() >= DISTANCE) {
     6             System.out.println(MessageFormat.format("{0}已经到达终点", athlete.getAthleteName()));
     7             pool.shutdownNow();
     8             return;
     9         }
    10     });
    11 }

    判断是否有一个运动员跑完了,如果有则结束所有线程;如果没有就会继续走到nextGeneration(),这也是我们上面说的,会做一些操作,我这里就不说了。

    执行完nextGeneration()后,所有线程会被唤醒(trip.signalAll()),然后他们就可以继续执行Athlete的run方法的while里的逻辑了;紧接着还会重置count、generation,这样就可以让栅栏数量保持正确了。

    这也就验证了我们之前说的,属性Generation,需要特别关注下,这是实现循环栅栏的核心。

    至此CyclicBarrier已经说完了,其它的函数很简单,就不在此赘述了。

  • 相关阅读:
    Java中使用CyclicBarrier
    Java中CountDownLatch使用初步
    设计模式简介
    Java中byte[]和char[]互相转换
    Java转换byte[]数组、Hex十六进制字符串
    TextBox自定义控件
    DataTrigger 绑定枚举
    WPF路径动画(动态逆向动画)
    github上传
    利用Canvas进行绘制XY坐标系
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/13201763.html
Copyright © 2011-2022 走看看