zoukankan      html  css  js  c++  java
  • JAVA并发,CyclicBarrier

    CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的(还是有点别扭的。所以还是别翻译了,只可意会不可言传啊)。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。

    CyclicBarrier 的使用并不难,但需要主要它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它同样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。意思就是说,同志们,别等了,有个小伙伴已经挂了,咱们如果继续等有可能会一直等下去,所有各回各家吧。

    CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。

    和 CountDownLatch 一样,CyclicBarrier 同样可以可以在构造函数中设定总计数值。与 CountDownLatch 不同的是,CyclicBarrier 的构造函数还可以接受一个 Runnable,会在 CyclicBarrier 被释放时执行。

    NOTE: CyclicBarrier 的功能也可以由 CountDownLatch 来实现 

     

    上面的内容参考自:http://developer.51cto.com/art/201403/432095.htm

    下面的例子引用thinking in java(网络上的简单例子体现不出CyclicBarrier存在的意义,和CountDownLatch的用处差不多)

     

      1 package com.xt.thinks21_7;
      2 
      3 //: concurrency/HorseRace.java
      4 // Using CyclicBarriers.
      5 import java.util.concurrent.*;
      6 import java.util.*;
      7 
      8 /**
      9  * 马儿类
     10  * @author Administrator
     11  *
     12  */
     13 class Horse implements Runnable {
     14     private static int counter = 0;
     15     private final int id = counter++;
     16     private int strides = 0;
     17     private static Random rand = new Random(47);
     18     private static CyclicBarrier barrier;
     19 
     20     public Horse(CyclicBarrier b) {
     21         barrier = b;
     22     }
     23 
     24     public synchronized int getStrides() {
     25         return strides;
     26     }
     27 
     28     public void run() {
     29         try {
     30             while (!Thread.interrupted()) {
     31                 synchronized (this) {
     32                     //马儿奔跑的不熟,可能是0步,1步,2步
     33                     strides += rand.nextInt(3); // Produces 0, 1 or 2
     34                 }
     35                 //当前线程加入
     36                 barrier.await();
     37                 /*
     38                  * await的源码
     39                  *  public int await() throws InterruptedException, BrokenBarrierException {
     40                         try {
     41                             return dowait(false, 0L);
     42                         } catch (TimeoutException toe) {
     43                             throw new Error(toe); // cannot happen
     44                         }
     45                     }
     46                     
     47                     dowait的源码:
     48                     private int dowait(boolean timed, long nanos)
     49                         throws InterruptedException, BrokenBarrierException,
     50                                TimeoutException {
     51                         final ReentrantLock lock = this.lock;
     52                         lock.lock();
     53                         try {
     54                             final Generation g = generation;
     55                 
     56                             if (g.broken)
     57                                 throw new BrokenBarrierException();
     58                 
     59                             if (Thread.interrupted()) {
     60                                 breakBarrier();
     61                                 throw new InterruptedException();
     62                             }
     63                 
     64                             int index = --count;
     65                             if (index == 0) {  // tripped
     66                                 boolean ranAction = false;
     67                                 try {
     68                                     final Runnable command = barrierCommand;
     69                                      */
     70                 
     71                 //这里需要注意,判断了CyclicBarrier构造器中的runnable接口是否为空,不为空回调run方法
     72 //                                    if (command != null)
     73 //                                        command.run();
     74                                     
     75                                     /*
     76                                     ranAction = true;
     77                                     nextGeneration();
     78                                     return 0;
     79                                 } finally {
     80                                     if (!ranAction)
     81                                         breakBarrier();
     82                                 }
     83                             }
     84                 
     85                             // loop until tripped, broken, interrupted, or timed out
     86                             for (;;) {
     87                                 try {
     88                                     if (!timed)
     89                                         trip.await();
     90                                     else if (nanos > 0L)
     91                                         nanos = trip.awaitNanos(nanos);
     92                                 } catch (InterruptedException ie) {
     93                                     if (g == generation && ! g.broken) {
     94                                         breakBarrier();
     95                                         throw ie;
     96                                     } else {
     97                                         // We're about to finish waiting even if we had not
     98                                         // been interrupted, so this interrupt is deemed to
     99                                         // "belong" to subsequent execution.
    100                                         Thread.currentThread().interrupt();
    101                                     }
    102                                 }
    103                 
    104                                 if (g.broken)
    105                                     throw new BrokenBarrierException();
    106                 
    107                                 if (g != generation)
    108                                     return index;
    109                 
    110                                 if (timed && nanos <= 0L) {
    111                                     breakBarrier();
    112                                     throw new TimeoutException();
    113                                 }
    114                             }
    115                         } finally {
    116                             lock.unlock();
    117                         }
    118                  */
    119             }
    120         } catch (InterruptedException e) {
    121             // A legitimate way to exit
    122         } catch (BrokenBarrierException e) {
    123             // This one we want to know about
    124             throw new RuntimeException(e);
    125         }
    126     }
    127 
    128     /**
    129      * 打印马儿id
    130      */
    131     public String toString() {
    132         return "Horse " + id + " ";
    133     }
    134 
    135     /**
    136      * 打印马儿奔跑轨迹
    137      */
    138     public String tracks() {
    139         StringBuilder s = new StringBuilder();
    140         for (int i = 0; i < getStrides(); i++)
    141             s.append("*");
    142         s.append(id);
    143         return s.toString();
    144     }
    145 }
    146 
    147 /**
    148  * 马儿奔跑的类
    149  * @author Administrator
    150  *
    151  */
    152 public class HorseRace {
    153     static final int FINISH_LINE = 75;
    154     private List<Horse> horses = new ArrayList<Horse>();
    155     private ExecutorService exec = Executors.newCachedThreadPool();
    156     private CyclicBarrier barrier;
    157 
    158     public HorseRace(int nHorses, final int pause) {
    159         //CyclicBarrier构造器,第二个参数为回调接口,触发条件是await计数到0的时候,也就是所有线程已经加入CyclicBarrier
    160         barrier = new CyclicBarrier(nHorses, new Runnable() {
    161             public void run() {
    162                 StringBuilder s = new StringBuilder();
    163                 //打印栅栏
    164                 for (int i = 0; i < FINISH_LINE; i++)
    165                     s.append("="); // The fence on the racetrack
    166                 System.out.println(s);
    167                 //打印所有马儿的奔跑轨迹
    168                 for (Horse horse : horses)
    169                     System.out.println(horse.tracks());
    170                 //如果有一个马儿奔跑到了终点,则取消执行器执行所有线程了
    171                 for (Horse horse : horses)
    172                     if (horse.getStrides() >= FINISH_LINE) {
    173                         System.out.print(horse + "won!");
    174                         exec.shutdownNow();
    175                         return;
    176                     }
    177                 //每一次所有马儿奔跑一次之后休眠时间
    178                 try {
    179                     TimeUnit.MILLISECONDS.sleep(pause);
    180                 } catch (InterruptedException e) {
    181                     System.out.println("barrier-action sleep interrupted");
    182                 }
    183             }
    184         });
    185         //加入所有马儿到线程列表,执行器执行线程列表(所有马儿奔跑)
    186         for (int i = 0; i < nHorses; i++) {
    187             Horse horse = new Horse(barrier);
    188             horses.add(horse);
    189             exec.execute(horse);
    190         }
    191     }
    192 
    193     public static void main(String[] args) {
    194         int nHorses = 7;
    195         int pause = 200;
    196         if (args.length > 0) { // Optional argument
    197             int n = new Integer(args[0]);
    198             nHorses = n > 0 ? n : nHorses;
    199         }
    200         if (args.length > 1) { // Optional argument
    201             int p = new Integer(args[1]);
    202             pause = p > -1 ? p : pause;
    203         }
    204         new HorseRace(nHorses, pause);
    205 
    206     }
    207 } /* (Execute to see output) */// :~

    总结:

    thinking in java上面的代码例子都是精华,不是网络上一般的博主能够写出来的,理解了对自己很有好处,及时一个知识点花上一个小时也可以。

    但是美中不足的是:thinking in java中翻译过来有些地方阐述的不是很清楚(可能也是原著就没阐述清楚),例如。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。这段话对CyclicBarrier的理解太重要了我认为。

  • 相关阅读:
    实体枚举字段注释反向生成数据库注释sql
    系统间数据存储和交互思路
    复选框与bitmap算法实践
    Entity Framework Core配置DbContext的两种方式
    C#语法糖——持续更新
    抽丝剥茧读源码——Microsoft.Extensions.Configuration(2)
    抽丝剥茧读源码——Microsoft.Extensions.Configuration(1)
    算法分享之关于atcoderbeginner166E的讲解
    关于coder168E问题的分析与解答(C语言)
    atcoder168D题
  • 原文地址:https://www.cnblogs.com/wubingshenyin/p/4486582.html
Copyright © 2011-2022 走看看