zoukankan      html  css  js  c++  java
  • JAVA并发之CountDownLatch

    前言

      之前我们将ReentranLock从锁层面到AQS源码层面分析了如何构建一个所需的同步器,使用AQS需要实现哪些方法。ReentranLock的CLH队列中每个节点都是独占(EXCLUSIVE)的,那么节点的另一种等待方式——共享(SHARED)又将是什么样子的呢?本篇博客让我们通过另一个同步器——闭锁(CountDownLatch)来了解一下有什么不同的吧。

    CountdownLatch的作用和主要流程:

       CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减1。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。

    CountdownLatch的例子:

      这里我用了一个jdk源码中的例子:

     1     class Driver { // ...
     2          void main() throws InterruptedException {
     3              CountDownLatch startSignal = new CountDownLatch(1);//设置启动闭锁
     4              CountDownLatch doneSignal = new CountDownLatch(N);//完成闭锁
     5              for (int i = 0; i < N; ++i) // create and start threads
     6                  new Thread(new Worker(startSignal, doneSignal)).start();
     7              doSomethingElse(); // 主线程做一些准备工作(先于其他线程)
     8              startSignal.countDown(); // 启动闭锁-1(解除work的await等待)
     9              doSomethingElse();
    10              doneSignal.await(); // wait for all to finish
    11          }
    12     }
    13     class Worker implements Runnable {
    14          private final CountDownLatch startSignal;
    15          private final CountDownLatch doneSignal;
    16          Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
    17              this.startSignal = startSignal;
    18              this.doneSignal = doneSignal;
    19          }
    20          public void run() {
    21              try {
    22                  startSignal.await();//等待主线程做完准备工作
    23                  doWork();
    24                  doneSignal.countDown();//结束后将结束闭锁-1
    25              } catch (InterruptedException ex) {
    26              } // return;
    27          }
    28          void doWork() { ... }
    29     }
      CountdownLatch可以保证主线程以及其他线程的先后顺序,在例子中我们可以看到主线程的第一个doSomethingElse先于其他线程,在其执行完后,将startSignal.countDown(),使其他线程从startSignal.await()的阻塞状态解除。
      闭锁可以保证某些操作的执行顺序,比如A操作依赖于B操作的结果,就可以用闭锁将A操作阻塞,在B操作完成后释放。

    CountDownLatch源码:

      CountdownLatch源码不多(毕竟AQS把底层的一些需求都封装好啦),来看看代码吧:

     1     //核心类Sync,继承自AQS
     2     private static final class Sync extends AbstractQueuedSynchronizer {
     3         private static final long serialVersionUID = 4982264981922014374L;
     4         Sync(int count) {
     5             setState(count);
     6         }
     7         int getCount() {
     8             return getState();
     9         }
    10         //共享的尝试获取资源,如果state为0,则说明可以解除封锁,否则,返回-1进入阻塞
    11         protected int tryAcquireShared(int acquires) {
    12             return (getState() == 0) ? 1 : -1;
    13         }
    14 
    15         //共享的释放资源
    16         protected boolean tryReleaseShared(int releases) {
    17             // Decrement count; signal when transition to zero
    18             for (;;) {
    19                 int c = getState();
    20                 if (c == 0)
    21                     return false;
    22                 int nextc = c-1;
    23                 if (compareAndSetState(c, nextc))
    24                     return nextc == 0;//当释放资源(-1)后state为0则说明可以解除封锁
    25             }
    26         }
    27     }
    28     private final Sync sync;
    29     
    30     public CountDownLatch(int count) {
    31         if (count < 0) throw new IllegalArgumentException("count < 0");
    32         this.sync = new Sync(count);
    33     }
    34 
    35     //在state变为0之前一直等待,除非被中断
    36     public void await() throws InterruptedException {
    37         sync.acquireSharedInterruptibly(1);
    38     }
    39 
    40     public boolean await(long timeout, TimeUnit unit)
    41         throws InterruptedException {
    42         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    43     }
    44 
    45     //计数器减1
    46     public void countDown() {
    47         sync.releaseShared(1);
    48     }
    49 
    50     public long getCount() {
    51         return sync.getCount();
    52     }

      以上基本就是CountdownLatch的全部源码了,我们先从await()方法中深入到AQS,根据上面的代码,await直接调用了acquireSharedInterruptibly()方法:

    1     public final void acquireSharedInterruptibly(int arg)
    2             throws InterruptedException {
    3         if (Thread.interrupted())//如果线程被中断,就往外抛
    4             throw new InterruptedException();
    5         if (tryAcquireShared(arg) < 0)//如果state不为0
    6             doAcquireSharedInterruptibly(arg);
    7     }

      如果未被中断的情况下并且state不为0(即闭锁要将线程锁住),我们再来看其中的doAcquireSharedInterruptibly方法做了什么呢,可以从CountdownLatch的作用猜测是将线程放入CLH队列中park等待休息,看看源码是否是我们猜测的那样吧:

     1     private void doAcquireSharedInterruptibly(int arg)
     2         throws InterruptedException {
     3         final Node node = addWaiter(Node.SHARED);
     4         boolean failed = true;
     5         try {
     6             for (;;) {
     7                 final Node p = node.predecessor();
     8                 if (p == head) {
     9                     int r = tryAcquireShared(arg);//如果state不为0,r会一直小于0(不断自旋)
    10                     if (r >= 0) {
    11                         setHeadAndPropagate(node, r);
    12                         p.next = null; // help GC
    13                         failed = false;
    14                         return;
    15                     }
    16                 }
    17                 if (shouldParkAfterFailedAcquire(p, node) &&
    18                     parkAndCheckInterrupt())//进入等待状态
    19                     throw new InterruptedException();
    20             }
    21         } finally {
    22             if (failed)
    23                 cancelAcquire(node);
    24         }
    25     }

      这个方法是不是很眼熟?和Reentranlock中用到的doAcquireInterruptibly方法类似,不断自旋,尝试获取资源(tryAcquireShared),获取成功就返回,失败就判断是否需要休息。具体的细节不再展开讨论了,在第一篇AQS概述中我们就讨论了,AQS底层只要求同步器实现一种tryAcquire/tryRelease,根据是否共享(shared)是否需要响应中断(Interruptibly)组合成四对,因此完整看完一个AQS的同步器,再查看其他同步器时,基本是一样的流程。

      我们继续看看countDown方法做了什么吧:

     1     public void countDown() {
     2         sync.releaseShared(1);
     3     }
     4     public final boolean releaseShared(int arg) {
     5         if (tryReleaseShared(arg)) {//如果state变为0
     6             doReleaseShared();
     7             return true;
     8         }
     9         return false;
    10     }
    11     private void doReleaseShared() {//把CLH队列中的所有signal的下一个节点都unpark
    12         for (;;) {
    13             Node h = head;
    14             if (h != null && h != tail) {
    15                 int ws = h.waitStatus;
    16                 if (ws == Node.SIGNAL) {
    17                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    18                         continue;            // loop to recheck cases
    19                     unparkSuccessor(h);
    20                 }
    21                 else if (ws == 0 &&
    22                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    23                     continue;                // loop on failed CAS
    24             }
    25             if (h == head)                   // loop if head changed
    26                 break;
    27         }
    28     }

      countDown方法会尝试将state减一,如果state为0,则进入doReleaseShared方法,将所有在CLH队列中等待的节点全都unpark(因为是共享的节点)。

    总结:

      本篇CountDownLatch简单的对其源码进行了分析,对于一些主要的方法,我都在代码上加了简单的注释,对AQS有一定了解的人一定能够看懂其中的流程以及操作。

  • 相关阅读:
    朋友面试被问到---静态构造函数
    (设计模式之一)浅析简单工厂模式
    out与ref修饰符
    图解引用类型
    图解值类型
    PHP之路---1---Wamp环境配置--php环境配置
    js遮罩层弹出框
    总结
    PSP记录个人项目耗时情况
    代码复审
  • 原文地址:https://www.cnblogs.com/zzzdp/p/9330353.html
Copyright © 2011-2022 走看看