zoukankan      html  css  js  c++  java
  • Java并发编程--CyclicBarrier

     

    概述

      CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

      CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

    使用

      提供的方法:

    复制代码
     1 //parties表示屏障拦截的线程数量,当屏障撤销时,先执行barrierAction,然后在释放所有线程
     2 public CyclicBarrier(int parties, Runnable barrierAction)
     3 //barrierAction默认为null
     4 public CyclicBarrier(int parties)
     5 
     6 /*
     7  *当前线程等待直到所有线程都调用了该屏障的await()方法
     8  *如果当前线程不是将到达的最后一个线程,将会被阻塞。解除阻塞的情况有以下几种
     9  *    1)最后一个线程调用await()
    10  *    2)当前线程被中断
    11     3)其他正在该CyclicBarrier上等待的线程被中断
    12     4)其他正在该CyclicBarrier上等待的线程超时
    13     5)其他某个线程调用该CyclicBarrier的reset()方法
    14  *如果当前线程在进入此方法时已经设置了该线程的中断状态或者在等待时被中断,将抛出InterruptedException,并且清除当前线程的已中断状态。
    15  *如果在线程处于等待状态时barrier被reset()或者在调用await()时 barrier 被损坏,将抛出 BrokenBarrierException 异常。
    16  *如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。 *如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作(barrierAction),那么在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
    17  *
    18  *返回值为当前线程的索引,0表示当前线程是最后一个到达的线程
    19  */
    20 public int await() throws InterruptedException, BrokenBarrierException
    21 //在await()的基础上增加超时机制,如果超出指定的等待时间,则抛出 TimeoutException 异常。如果该时间小于等于零,则此方法根本不会等待。
    22 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
    23 
    24 //将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。
    25 public void reset()
    复制代码

      对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

      使用示例:

        每个Worker处理矩阵中的一行,在处理完所有的行之前,该线程将一直在屏障处等待。在各个WOrker处理完所有行后,将执行提供的Runnable屏障操作。

    复制代码
     1 class Solver {
     2     final int N;    //矩阵的行数
     3     final float[][] data;    //要处理的矩阵
     4     final CyclicBarrier barrier;    //循环屏障
     5 
     6     class Worker implements Runnable {
     7         int myRow;
     8         Worker(int row) { myRow = row; }
     9         public void run() {
    10             while (!done()) {
    11                 processRow(myRow);    //处理指定一行数据
    12 
    13                 try {
    14                     barrier.await();     //在屏障处等待直到
    15                 } catch (InterruptedException ex) { 
    16                     return; 
    17                 } catch (BrokenBarrierException ex) {
    18                     return; 
    19                 }
    20             }
    21         }
    22     }
    23 
    24     public Solver(float[][] matrix) {
    25         data = matrix;
    26         N = matrix.length;
    27         //初始化CyclicBarrier
    28         barrier = new CyclicBarrier(N, new Runnable() {
    29                                            public void run() { 
    30                                              mergeRows(...); //合并行
    31                                            }
    32                                        });
    33         for (int i = 0; i < N; ++i) 
    34             new Thread(new Worker(i)).start();
    35 
    36         waitUntilDone();
    37     }
    38 }
    复制代码

    实现原理

      基于ReentrantLock和Condition机制实现。除了getParties()方法,CyclicBarrier的其他方法都需要获取锁。

      域

    复制代码
     1 /** The lock for guarding barrier entry */
     2 private final ReentrantLock lock = new ReentrantLock();    //可重入锁
     3 /** Condition to wait on until tripped */
     4 private final Condition trip = lock.newCondition();
     5 /** The number of parties */
     6 private final int parties;    //拦截的线程数量
     7 /* The command to run when tripped */
     8 private final Runnable barrierCommand;    //当屏障撤销时,需要执行的屏障操作
     9 //当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。
    10 private Generation generation = new Generation();
    11 
    12 /**
    13  * Number of parties still waiting. Counts down from parties to 0
    14  * on each generation.  It is reset to parties on each new
    15  * generation or when broken.
    16  */
    17 private int count;    //还能阻塞的线程数(即parties-当前阻塞的线程数),当新建generation或generation被破坏时,count会被重置。因为对Count的操作都是在获取锁之后,所以不需要其他同步措施。
    18 
    19 //静态内联类
    20 private static class Generation {
    21     boolean broken = false;    //当前的屏障是否破坏
    22 }
    复制代码

      await()

    复制代码
     1 public int await() throws InterruptedException, BrokenBarrierException {
     2     try {
     3         return dowait(false, 0L);
     4     } catch (TimeoutException toe) {
     5         throw new Error(toe); // cannot happen;
     6     }
     7 }
     8 
     9 private int dowait(boolean timed, long nanos)
    10     throws InterruptedException, BrokenBarrierException,
    11            TimeoutException {
    12     final ReentrantLock lock = this.lock;
    13     lock.lock();    //获取锁
    14     try {
    15         //保存此时的generation
    16         final Generation g = generation;
    17         //判断屏障是否被破坏
    18         if (g.broken)
    19             throw new BrokenBarrierException();
    20         //判断线程是否被中断,如果被中断,调用breakBarrier()进行屏障破坏处理,并抛出InterruptedException
    21         if (Thread.interrupted()) {
    22             breakBarrier();    
    23             throw new InterruptedException();
    24         }
    25 
    26         int index = --count;    //剩余count递减,并赋值给线程索引,作为方法的返回值
    27         //如果线程索引将为0,说明当前线程是最后一个到达的线程。执行可能存在的屏障操作 barrierCommand,设置下一个Generation。相当于每次开闸之后都进行了一次reset。
    28         if (index == 0) {  // tripped    
    29             boolean ranAction = false;
    30             try {
    31                 final Runnable command = barrierCommand;
    32                 if (command != null)
    33                     command.run();    //同步执行barrierCommand
    34                 ranAction = true;
    35                 nextGeneration();    //执行成功设置下一个nextGeneration
    36                 return 0;
    37             } finally {
    38                 if (!ranAction)    //如果barrierCommand执行失败,进行屏障破坏处理
    39                     breakBarrier();
    40             }
    41         }
    42         
    43         //如果当前线程不是最后一个到达的线程
    44         // loop until tripped, broken, interrupted, or timed out
    45         for (;;) {
    46             try {
    47                 if (!timed)
    48                     trip.await();    //调用Condition的await()方法阻塞
    49                 else if (nanos > 0L)
    50                     nanos = trip.awaitNanos(nanos);    //调用Condition的awaitNanos()方法阻塞
    51             } catch (InterruptedException ie) {
    52                 //如果当前线程被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程
    53                 if (g == generation && ! g.broken) {    
    54                     breakBarrier();
    55                     throw ie;
    56                 } else {
    57                     // We're about to finish waiting even if we had not
    58                     // been interrupted, so this interrupt is deemed to
    59                     // "belong" to subsequent execution.
    60                     Thread.currentThread().interrupt();
    61                     //这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断。
    62                 }
    63             }
    64             //如果屏障被破坏,当前线程抛BrokenBarrierException
    65             if (g.broken)
    66                 throw new BrokenBarrierException();
    67             
    68             //如果已经换代,直接返回index(last thread已经执行的nextGeneration,但当前线程还没有执行到该语句)
    69             if (g != generation)
    70                 return index;
    71             
    72             //超时,进行屏障破坏处理,并抛TimeoutException
    73             if (timed && nanos <= 0L) {
    74                 breakBarrier();
    75                 throw new TimeoutException();
    76             }
    77         }
    78     } finally {
    79         lock.unlock();    //释放锁
    80     }
    81 }
    82 
    83 //将当前屏障置为破坏状态、重置count、并唤醒所有被阻塞的线程。
    84 //必须先获取锁,才能调用此方法
    85 private void breakBarrier() {
    86     generation.broken = true;
    87     count = parties;
    88     trip.signalAll();
    89 }
    90 
    91 //唤醒trip上等待的所有线程,设置下一个Generation
    92 private void nextGeneration() {
    93     // signal completion of last generation
    94     trip.signalAll();
    95     // set up next generation
    96     count = parties;
    97     generation = new Generation();
    98 }
    复制代码

      reset()

    复制代码
     1 //重置屏障,先进行屏障破坏处理,再设置下一代generation
     2 public void reset() {
     3     final ReentrantLock lock = this.lock;
     4     lock.lock();
     5     try {
     6         breakBarrier();   // break the current generation
     7         nextGeneration(); // start a new generation
     8     } finally {
     9         lock.unlock();
    10     }
    11 }
    复制代码

    CyclicBarrier与CountDownLatch比较

      1)CountDownLatch:一个线程(或者多个),等待另外N个线程完成某个事情之后才能执行;CyclicBarrier:N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。

      2)CountDownLatch:一次性的;CyclicBarrier:可以重复使用。

      3)CountDownLatch基于AQS;CyclicBarrier基于锁和Condition。本质上都是依赖于volatile和CAS实现的。

    参考资料

      JDK Doc

      《Java并发编程的艺术》

  • 相关阅读:
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    UVa 437 (变形的LIS) The Tower of Babylon
    UVa 1025 (动态规划) A Spy in the Metro
    UVa 10129 (并查集 + 欧拉路径) Play on Words
  • 原文地址:https://www.cnblogs.com/williamjie/p/9456735.html
Copyright © 2011-2022 走看看