zoukankan      html  css  js  c++  java
  • Java多线程之JUC包:CyclicBarrier源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/go2sea/p/5615531.html

    CyclicBarrier是java.util.concurrent包中提供的同步工具。通过这个工具我们可以实现n个线程相互等待。我们可以通过参数指定达到公共屏障点之后的行为。

    先上源码:

      1 package java.util.concurrent;
      2 import java.util.concurrent.locks.*;
      3 
      4 public class CyclicBarrier {
      5 
      6     private static class Generation {
      7         boolean broken = false;
      8     }
      9 
     10     private final ReentrantLock lock = new ReentrantLock();
     11     private final Condition trip = lock.newCondition();
     12     private final int parties;
     13     private final Runnable barrierCommand;
     14     private Generation generation = new Generation();
     15     private int count;
     16 
     17     private void nextGeneration() {
     18         // signal completion of last generation
     19         trip.signalAll();
     20         // set up next generation
     21         count = parties;
     22         generation = new Generation();
     23     }
     24 
     25 
     26     private void breakBarrier() {
     27         generation.broken = true;
     28         count = parties;
     29         trip.signalAll();
     30     }
     31 
     32     private int dowait(boolean timed, long nanos) 
     33             throws InterruptedException, BrokenBarrierException, TimeoutException {
     34         final ReentrantLock lock = this.lock;
     35         lock.lock();
     36         try {
     37             final Generation g = generation;
     38 
     39             //小概率事件:该线程在等待锁的过程中,barrier被破坏
     40             if (g.broken)
     41                 throw new BrokenBarrierException();
     42 
     43             //小概率事件:该线程在等待锁的过程中被中断
     44             if (Thread.interrupted()) {
     45                 breakBarrier();
     46                 throw new InterruptedException();
     47             }
     48 
     49            int index = --count;
     50            //当有parties个线程到达barrier
     51            if (index == 0) {  // tripped
     52                 boolean ranAction = false;
     53                 try {
     54                    final Runnable command = barrierCommand;
     55                    //如果设置了barrierCommand,令最后到达的barrier的线程执行它
     56                    if (command != null)
     57                         command.run();
     58                     ranAction = true;
     59                     nextGeneration();
     60                     return 0;
     61                } finally {
     62                     //注意:当执行barrierCommand出现异常时,ranAction派上用场
     63                     if (!ranAction)
     64                         breakBarrier();
     65                }
     66            }
     67 
     68             // loop until tripped, broken, interrupted, or timed out
     69             for (;;) {
     70                 try {
     71                     if (!timed)
     72                         trip.await();
     73                     else if (nanos > 0L)
     74                         //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
     75                         nanos = trip.awaitNanos(nanos);
     76                 } catch (InterruptedException ie) {
     77                     if (g == generation && ! g.broken) {
     78                         breakBarrier();
     79                         throw ie;
     80                     } else {
     81                         //小概率事件:该线程被中断,进入锁等待队列
     82                         //在等待过程中,另一个线程更新或破坏了generation
     83                         //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
     84                         //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
     85                         //两种情况:
     86                         //①g被破坏。已经有一个线程抛出了InterruptedException(也只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
     87                         //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
     88                         Thread.currentThread().interrupt();
     89                     }
     90                 }
     91 
     92                 //barrier被破坏,抛出异常
     93                 if (g.broken)
     94                     throw new BrokenBarrierException();
     95                 
     96                 //barrier正常进入下一循环,上一代await的线程继续执行
     97                 if (g != generation)
     98                     return index;
     99                 
    100                 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
    101                 if (timed && nanos <= 0L) {
    102                     breakBarrier();
    103                     throw new TimeoutException();
    104                 }
    105             }
    106         } finally {
    107             lock.unlock();
    108         }
    109     }
    110 
    111 
    112     public CyclicBarrier(int parties, Runnable barrierAction) {
    113         if (parties <= 0) throw new IllegalArgumentException();
    114         this.parties = parties;
    115         this.count = parties;
    116         this.barrierCommand = barrierAction;
    117     }
    118 
    119     public CyclicBarrier(int parties) {
    120         this(parties, null);
    121     }
    122 
    123 
    124     public int getParties() {
    125         return parties;
    126     }
    127 
    128 
    129     public int await() throws InterruptedException, BrokenBarrierException {
    130         try {
    131             return dowait(false, 0L);
    132         } catch (TimeoutException toe) {
    133             throw new Error(toe); // cannot happen;
    134         }
    135     }
    136 
    137 
    138     public int await(long timeout, TimeUnit unit)
    139         throws InterruptedException,
    140                BrokenBarrierException,
    141                TimeoutException {
    142         return dowait(true, unit.toNanos(timeout));
    143     }
    144 
    145     
    146     public boolean isBroken() {
    147         final ReentrantLock lock = this.lock;
    148         lock.lock();
    149         try {
    150             return generation.broken;
    151         } finally {
    152             lock.unlock();
    153         }
    154     }
    155 
    156     public void reset() {
    157         final ReentrantLock lock = this.lock;
    158         lock.lock();
    159         try {
    160             breakBarrier();   // break the current generation
    161             nextGeneration(); // start a new generation
    162         } finally {
    163             lock.unlock();
    164         }
    165     }
    166 
    167     public int getNumberWaiting() {
    168         final ReentrantLock lock = this.lock;
    169         lock.lock();
    170         try {
    171             return parties - count;
    172         } finally {
    173             lock.unlock();
    174         }
    175     }
    176 }
    View Code

    我们先来看一下CyclicBarrier的成员变量:

    1 private final ReentrantLock lock = new ReentrantLock();
    2 private final Condition trip = lock.newCondition();
    3 private final int parties;
    4 private final Runnable barrierCommand;
    5 private Generation generation = new Generation();
    6 private int count;

    CyclicBarrier是通过独占锁lock和Condition对象trip来实现的,成员parties表示必须有parties个线程到达barrier,成员barrierCommand表示当parties个线程到达之后要执行的代码,成员count表示离触发barrierCommand还差count个线程(还有count个线程未到达barrier),成员generation表示当前的“代数”,“cyclic”表示可循环使用,generation是对一次循环的标识。注意:Generation是CyclicBarrier的一个私有内部类,他只有一个成员变量来标识当前的barrier是否已“损坏”:

    1 private static class Generation {
    2     boolean broken = false;
    3 }

    构造函数

     1 public CyclicBarrier(int parties, Runnable barrierAction) {
     2     if (parties <= 0) throw new IllegalArgumentException();
     3     this.parties = parties;
     4     this.count = parties;
     5     this.barrierCommand = barrierAction;
     6 }
     7 
     8 public CyclicBarrier(int parties) {
     9     this(parties, null);
    10 }

    CyclicBarrier提供了两种构造函数,没有指定barrierCommand的构造函数是调用第二个构造函数实现的。第二个构造函数有两个参数:parties和barrierAction,分别用来初始化成员parties和barrierCommand。注意,parties必须大于0,否则会抛出IllegalArgumentException。

     

    await()方法

    1 public int await() throws InterruptedException, BrokenBarrierException {
    2     try {
    3         return dowait(false, 0L);
    4     } catch (TimeoutException toe) {
    5      throw new Error(toe); // cannot happen;
    6     }
    7 }

    await方法是由调用dowait方法实现的,两个参数分别代表是否定时等待和等待的时长。

    doawait()方法

     1     private int dowait(boolean timed, long nanos) 
     2             throws InterruptedException, BrokenBarrierException, TimeoutException {
     3         final ReentrantLock lock = this.lock;
     4         lock.lock();
     5         try {
     6             final Generation g = generation;
     7 
     8             //小概率事件:该线程在等待锁的过程中,barrier被破坏
     9             if (g.broken)
    10                 throw new BrokenBarrierException();
    11 
    12             //小概率事件:该线程在等待锁的过程中被中断
    13             if (Thread.interrupted()) {
    14                 breakBarrier();
    15                 throw new InterruptedException();
    16             }
    17 
    18            int index = --count;
    19            //当有parties个线程到达barrier
    20            if (index == 0) {  // tripped
    21                 boolean ranAction = false;
    22                 try {
    23                    final Runnable command = barrierCommand;
    24                    //如果设置了barrierCommand,令最后到达的barrier的线程执行它
    25                    if (command != null)
    26                         command.run();
    27                     ranAction = true;
    28                     nextGeneration();
    29                     return 0;
    30                } finally {
    31                     //注意:当执行barrierCommand出现异常时,ranAction派上用场
    32                     if (!ranAction)
    33                         breakBarrier();
    34                }
    35            }
    36 
    37             // loop until tripped, broken, interrupted, or timed out
    38             for (;;) {
    39                 try {
    40                     if (!timed)
    41                         trip.await();
    42                     else if (nanos > 0L)
    43                         //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
    44                         nanos = trip.awaitNanos(nanos);
    45                 } catch (InterruptedException ie) {
    46                     if (g == generation && ! g.broken) {
    47                         breakBarrier();
    48                         throw ie;
    49                     } else {
    50                         //小概率事件:该线程被中断,进入锁等待队列
    51                         //在等待过程中,另一个线程更新或破坏了generation
    52                         //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
    53                         //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
    54                         //两种情况:
    55                         //①g被破坏:已有一个线程抛出InterruptedException(只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
    56                         //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
    57                         Thread.currentThread().interrupt();
    58                     }
    59                 }
    60 
    61                 //barrier被破坏,抛出异常
    62                 if (g.broken)
    63                     throw new BrokenBarrierException();
    64                 
    65                 //barrier正常进入下一循环,上一代await的线程继续执行
    66                 if (g != generation)
    67                     return index;
    68                 
    69                 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
    70                 if (timed && nanos <= 0L) {
    71                     breakBarrier();
    72                     throw new TimeoutException();
    73                 }
    74             }
    75         } finally {
    76             lock.unlock();
    77         }
    78     }

    dowait方法是CyclicBarrier的精华。应该重点来理解。

    方法开头首先申请锁,然后做了两个判断:g.broken和Thread.interrupted(),这两个判断是分别处理两种小概率的事件:①该线程在等待锁的过程中,barrier被破坏②该线程在等待锁的过程中被中断。这两个事件应抛出相应的异常。接下来dowait方法修改了令count减1,如果此时count减为0,说明已经有parties个线程到达barrier,这时由最后到达barrier的线程去执行barrierCommand。注意,这里设置了一个布尔值ranAction,作用是来标识barrierCommand是否被正确执行完毕,如果执行失败,finally中会执行breakBarrier操作。如果count尚未减为0,则在Condition对象trip上执行await操作,注意:这里有一个InterruptedException的catch子句。当前线程在await中被中断时,会抛出InterruptedException,这时候如果g==generation&&!g.broken的话,我们执行breakBarrier操作,同时抛出这个异常;如果g!=generation或者g.broken==true的话,我们的操作是重置interrupt标志而不是抛出这个异常。这么做的原因我们分两种情况讨论:

    ①g被破坏,这也是一个小概率事件,当前线程被中断后进入锁等待队列,此时另一个线程由于某种原因(超时或者被中断)在他之前获取了锁并执行了breakBarrier方法,那么当前线程持有锁之后就不应再抛InterruptedException,逻辑上应该处理barrier被破坏事件,事实上在后续g.broken的检查中,他会抛出一个BrokenBarrierException。而当前的InterruptedException被我们捕获却没有做出处理,所以执行interrupt方法重置中断标志,交由上层程序处理。

    ②g被更新:说明当前线程在即将完成等待之际被中断,此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理。

    后续对g.broken和g!=generation的判断,分表代表了被唤醒线程(非最后一个到达barrier的线程,也不是被中断或第一个超时的线程)的两种退出方法的方式:第一种是以barrier被破坏告终(然后抛异常),第二个是barrier等到parties个线程,寿终正寝(返回该线程的到达次序index)。

    最后一个if是第一个超时线程执行breakBarrier操作并跑出异常。最后finally子句要释放锁。

    至此,整个doawait方法流程就分析完毕了,我们可以发现,在barrier上等待的线程,如果以抛异常结束的话,只有第一个线程会抛InterruptedException或TimeoutException并执行breakBarrier操作,其他等待线程只能抛BrokenBarrierException,逻辑上这也是合理的:一个barrier只能因超时或中断被破坏一次。

     1 private void nextGeneration() {
     2     trip.signalAll();
     3     count = parties;
     4     generation = new Generation();
     5 }
     6 
     7 private void breakBarrier() {
     8     generation.broken = true;
     9     count = parties;
    10     trip.signalAll();
    11 }

    doawait方法中用到的nextGeneration方法将所有等待线程唤醒,更新generation对象,复位count,进入下一轮任务。breakBarrier方法将generation状态值为broken,复位count(这个复位看上去没有用,但实际上,在broken之后reset之前,如果调用getNumberWaiting方法查看等待线程数的话,复位count是合理的),并唤醒所有等待线程。在调用reset更新generation之前,barrier将处于不可用状态。

     

    reset()方法

     1 public void reset() {
     2     final ReentrantLock lock = this.lock;
     3     lock.lock();
     4     try {
     5         breakBarrier();   // break the current generation
     6         nextGeneration(); // start a new generation
     7     } finally {
     8         lock.unlock();
     9     }
    10 }

    reset方法先break当执行breakBarrier操作(如果有线程在barrier上等待,调用reset会导致BrokenBarrierException),再更新generation对象。

     


    作者:开方乘十

    出处:http://www.cnblogs.com/go2sea/

    本文版权归作者开方乘十和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

    如有不正之处,欢迎邮件(hailong.ma@qq.com)指正,谢谢。

  • 相关阅读:
    chmod命令详细用法
    mysql删除sql表添加别名及删除sql的注意事项
    bootstrap栅格系统进行偏移格式
    mysql中时间计算函数SQL DATE_SUB()用法
    阿里图标的应用教程
    jquery.cookie.js中$.cookie() 使用方法
    $.cookie()取值设置
    java中年月日的加减法,年月的加减法使用
    IMAP命令与分析
    Telnet IMAP Commands Note
  • 原文地址:https://www.cnblogs.com/go2sea/p/5615531.html
Copyright © 2011-2022 走看看