zoukankan      html  css  js  c++  java
  • JDK源码分析之concurrent包(四) -- CyclicBarrier与CountDownLatch

      上一篇我们主要通过ExecutorCompletionService与FutureTask类的源码,对Future模型体系的原理做了了解,本篇开始解读concurrent包中的工具类的源码。首先来看两个非常实用的工具类CyclicBarrier与CountDownLatch是如何实现的。


    CyclicBarrier

    CyclicBarrier直译过来是“循环屏障”,作用是可以使固定数量的线程都达到某个屏障点(调用await方发处)后,才继续向下执行。关于用法和实例本文就不做过多说明,现在直接进入CyclicBarrier的源码。

    首先,来看下CyclicBarrier的几个标志性的成员变量:

     1 private static class Generation {
     2     boolean broken = false;
     3 }
     4 /** The number of parties */
     5 private final int parties;
     6 /* The command to run when tripped */
     7 private final Runnable barrierCommand;
     8 /** The current generation */
     9 private Generation generation = new Generation();
    10 
    11 /**
    12  * Number of parties still waiting. Counts down from parties to 0
    13  * on each generation.  It is reset to parties on each new
    14  * generation or when broken.
    15  */
    16 private int count;

    这几个成员变量有以下说明:

    • 说明1:parties是final的,在构造时,传入的固定线程数,不可变;
    • 说明2:count是计数器,如果有线程到达了屏障点,count就减1;
    • 说明3:直到count=0时,其它线程才可以向下执行;
    • 说明4:barrierCommand是Runnable任务,在所有线程到达屏障点是,就执行barrierCommand,barrierCommand是构造时传入的,可以为空;
    • 说明5:generation比较复杂,是静态内部类Generation的实例,一个generation对象代表一代的屏障,就是说,如果generation对象不同,就代表进入了下一次的屏障,所以说,这个线程屏障是可循环的(Cyclic)。
    • 说明6:另外,generation的唯一的一个名为broken的成员变量代表屏障是否被破坏掉,破坏的原因可能是线程中断、失败或者超时等。如果被破坏,则所有线程都将抛出异常。

    了解上述成员变量的说明后,基本上就可以知道了CyclicBarrier的实现原理,下面我们来看看代码是如何写的。其实实现很简单,我们只需通过await()方法就可以说明:

    1 public int await() throws InterruptedException, BrokenBarrierException {
    2     try {
    3         return dowait(false, 0L);
    4     } catch (TimeoutException toe) {
    5         throw new Error(toe); // cannot happen;
    6     }
    7 }

    await()方法调用了真是的执行方法dowait(),这个方法里涵盖了所有乾坤:

     1 /**
     2  * Main barrier code, covering the various policies.
     3  */
     4 private int dowait(boolean timed, long nanos)
     5     throws InterruptedException, BrokenBarrierException,
     6            TimeoutException {
     7     final ReentrantLock lock = this.lock;
     8     lock.lock();
     9     try {
    10         final Generation g = generation;
    11 
    12         if (g.broken)
    13             throw new BrokenBarrierException();
    14 
    15         if (Thread.interrupted()) {
    16             breakBarrier();
    17             throw new InterruptedException();
    18         }
    19 
    20        int index = --count;
    21        if (index == 0) {  // tripped
    22            boolean ranAction = false;
    23            try {
    24    final Runnable command = barrierCommand;
    25                if (command != null)
    26                    command.run();
    27                ranAction = true;
    28                nextGeneration();
    29                return 0;
    30            } finally {
    31                if (!ranAction)
    32                    breakBarrier();
    33            }
    34        }
    35 
    36         // loop until tripped, broken, interrupted, or timed out
    37         for (;;) {
    38             try {
    39                 if (!timed)
    40                     trip.await();
    41                 else if (nanos > 0L)
    42                     nanos = trip.awaitNanos(nanos);
    43             } catch (InterruptedException ie) {
    44                 if (g == generation && ! g.broken) {
    45                     breakBarrier();
    46                                 throw ie;
    47                         } else {
    48                         // We're about to finish waiting even if we had not
    49                         // been interrupted, so this interrupt is deemed to
    50                         // "belong" to subsequent execution.
    51                         Thread.currentThread().interrupt();
    52                         }
    53             }
    54 
    55             if (g.broken)
    56                 throw new BrokenBarrierException();
    57 
    58             if (g != generation)
    59                 return index;
    60 
    61             if (timed && nanos <= 0L) {
    62                 breakBarrier();
    63                 throw new TimeoutException();
    64             }
    65         }
    66     } finally {
    67         lock.unlock();
    68     }
    69 }

    代码第20行对应“说明2”。

    代码第21行对应“说明3”。

    代码第26行对应“说明4”。

    代码第28行对应“说明5”,nextGeneration()方法中使用generation = new Generation();表示屏障已经换代,并唤醒所有线程。nextGeneration()请自行查看源码。

    代码第16行第45行等所有调用breakBarrier()方法处,对应“说明6”,表示屏障被破坏,breakBarrier()方法中将generation.broken = true,唤醒所有线程,抛出异常。

    最后,代码第40行处trip.await(),表示持有trip的线程进入等待被唤醒状态。

    另外,实现中还有一个很重要的点,就是第8行的lock和第67行的unlock,保证同步状态下执行这段逻辑,也就保证了count与generation.broken的线程安全。

    以上就是CyclicBarrier(循环使用的屏障)的源码实现,是不是比较简单。

    CountDownLatch

    CountDownLatch直译过来是“倒计数锁”。在线程的countDown()动作将计数减至0时,所有的await()处的线程将可以继续向下执行。CountDownLatch的功能与CyclicBarrier有一点点像,但实现方式却很不同,下面直接来观察CountDownLatch的两个最重要的方法:

    1 public void await() throws InterruptedException {
    2     sync.acquireSharedInterruptibly(1);
    3 }
    4 
    5 public void countDown() {
    6     sync.releaseShared(1);
    7 }

    可以看到,这两个方法实际是由静态内部类Sync来实现的。这个Sync我们在上一篇FutureTask的实现中也见过,那我们就先简单介绍下Sync究竟是用来做什么的:

    Sync extends AbstractQueuedSynchronizer

    这个抽象类AbstractQueuedSynchronizer是一个框架,这个框架使用了“共享”与“独占”两张方式通过一个int值来表示状态的同步器。类中含有一个先进先出的队列用来存储等待的线程。

    这个类定义了对int值的原子操作的方法,并强制子类定义int的那种状态是获取,哪种状态是释放。子类可以选择“共享”和“独占”的一种或两种来实现。

    共享方式的实现方式是死循环尝试获取对象状态,类似自旋锁。

    独占方式的实现方式是通过实现Condition功能的内部的类,保证独占锁。

    而我们正在解读的CountDownLatch中的内部类Sync是使用的共享方式,对于AbstractQueuedSynchronizer的解读本篇不打算详细说明,因为笔者对“独占”方式还没彻底弄通,如果以后有机会再来补充。

    接下来就直接观察CountDownLatch.Sync的源码:

     1 /**
     2  * Synchronization control For CountDownLatch.
     3  * Uses AQS state to represent count.
     4  */
     5 private static final class Sync extends AbstractQueuedSynchronizer {
     6     private static final long serialVersionUID = 4982264981922014374L;
     7 
     8     Sync(int count) {
     9         setState(count);
    10     }
    11 
    12     int getCount() {
    13         return getState();
    14     }
    15 
    16     public int tryAcquireShared(int acquires) {
    17         return getState() == 0? 1 : -1;
    18     }
    19 
    20     public boolean tryReleaseShared(int releases) {
    21         // Decrement count; signal when transition to zero
    22         for (;;) {
    23             int c = getState();
    24             if (c == 0)
    25                 return false;
    26             int nextc = c-1;
    27             if (compareAndSetState(c, nextc))
    28                 return nextc == 0;
    29         }
    30     }
    31 }

    结合最初列出的await()和countDown()方法,

    通过上述代码第9行可以看到,CountDownLatch将构造时传入的用来倒计数的count作为状态值。

    通过上述代码第17行可以看到,CountDownLatch定义了当count=0时表示可以共享获取状态(在await()方法中调用的sync.acquireSharedInterruptibly(1)会死循环尝试获取状态)。

    通过上述代码第26行可以看到,CountDownLatch定义了当count-1表示一次共享释放状态(在countDown()方法中调用的sync.releaseShared(1)会涉及)。

    以上就是CountDownLatch的源码实现。

    总结

    CyclicBarrier与CountDownLatch有一点相似之处,但是有很大区别。它们的异同我个人总结如下:

    类似功能

    • CyclicBarrier与CountDownLatch都是通过计数到达一定标准后,使得在await()处的线程继续向下执行。

    不同之处

    • CyclicBarrier的实现是通过线程的等待唤醒;CountDownLatch的实现是通过死循环访问状态的自旋机制
    • CyclicBarrier在线程改变计数后不能向下执行(await()改变计数);CountDownLatch在线程改变计数后继续向下执行(countDown()改变计数)
    • CyclicBarrier的计数可以被重置,循环使用;CountDownLatch的计数只能使用一次
  • 相关阅读:
    TCP的三次握手与四次挥手
    HashMap源代码分析(JDK1.8)
    HashMap实现原理分析(JDK1.8前)
    codebook法分割前景目标
    平均场景法分割前景目标
    边缘梯度方向直方图的构建
    学习opencv 第六章 习题十三
    《学习OpenCV》 第四章 习题六
    《学习Opencv》第五章 习题6
    Opencv实现的简易绘图工具
  • 原文地址:https://www.cnblogs.com/hanmou/p/4626143.html
Copyright © 2011-2022 走看看