zoukankan      html  css  js  c++  java
  • J.U.C并发框架源码阅读(四)CountDownLatch

    基于版本jdk1.7.0_80

    java.util.concurrent.CountDownLatch

    代码如下

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.concurrent.locks.*;
    import java.util.concurrent.atomic.*;
    
    /**
     * A synchronization aid that allows one or more threads to wait until
     * a set of operations being performed in other threads completes.
     *
     * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
     * The {@link #await await} methods block until the current count reaches
     * zero due to invocations of the {@link #countDown} method, after which
     * all waiting threads are released and any subsequent invocations of
     * {@link #await await} return immediately.  This is a one-shot phenomenon
     * -- the count cannot be reset.  If you need a version that resets the
     * count, consider using a {@link CyclicBarrier}.
     *
     * <p>A {@code CountDownLatch} is a versatile synchronization tool
     * and can be used for a number of purposes.  A
     * {@code CountDownLatch} initialized with a count of one serves as a
     * simple on/off latch, or gate: all threads invoking {@link #await await}
     * wait at the gate until it is opened by a thread invoking {@link
     * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
     * can be used to make one thread wait until <em>N</em> threads have
     * completed some action, or some action has been completed N times.
     *
     * <p>A useful property of a {@code CountDownLatch} is that it
     * doesn't require that threads calling {@code countDown} wait for
     * the count to reach zero before proceeding, it simply prevents any
     * thread from proceeding past an {@link #await await} until all
     * threads could pass.
     *
     * <p><b>Sample usage:</b> Here is a pair of classes in which a group
     * of worker threads use two countdown latches:
     * <ul>
     * <li>The first is a start signal that prevents any worker from proceeding
     * until the driver is ready for them to proceed;
     * <li>The second is a completion signal that allows the driver to wait
     * until all workers have completed.
     * </ul>
     *
     * <pre>
     * class Driver { // ...
     *   void main() throws InterruptedException {
     *     CountDownLatch startSignal = new CountDownLatch(1);
     *     CountDownLatch doneSignal = new CountDownLatch(N);
     *
     *     for (int i = 0; i < N; ++i) // create and start threads
     *       new Thread(new Worker(startSignal, doneSignal)).start();
     *
     *     doSomethingElse();            // don't let run yet
     *     startSignal.countDown();      // let all threads proceed
     *     doSomethingElse();
     *     doneSignal.await();           // wait for all to finish
     *   }
     * }
     *
     * class Worker implements Runnable {
     *   private final CountDownLatch startSignal;
     *   private final CountDownLatch doneSignal;
     *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     *      this.startSignal = startSignal;
     *      this.doneSignal = doneSignal;
     *   }
     *   public void run() {
     *      try {
     *        startSignal.await();
     *        doWork();
     *        doneSignal.countDown();
     *      } catch (InterruptedException ex) {} // return;
     *   }
     *
     *   void doWork() { ... }
     * }
     *
     * </pre>
     *
     * <p>Another typical usage would be to divide a problem into N parts,
     * describe each part with a Runnable that executes that portion and
     * counts down on the latch, and queue all the Runnables to an
     * Executor.  When all sub-parts are complete, the coordinating thread
     * will be able to pass through await. (When threads must repeatedly
     * count down in this way, instead use a {@link CyclicBarrier}.)
     *
     * <pre>
     * class Driver2 { // ...
     *   void main() throws InterruptedException {
     *     CountDownLatch doneSignal = new CountDownLatch(N);
     *     Executor e = ...
     *
     *     for (int i = 0; i < N; ++i) // create and start threads
     *       e.execute(new WorkerRunnable(doneSignal, i));
     *
     *     doneSignal.await();           // wait for all to finish
     *   }
     * }
     *
     * class WorkerRunnable implements Runnable {
     *   private final CountDownLatch doneSignal;
     *   private final int i;
     *   WorkerRunnable(CountDownLatch doneSignal, int i) {
     *      this.doneSignal = doneSignal;
     *      this.i = i;
     *   }
     *   public void run() {
     *      try {
     *        doWork(i);
     *        doneSignal.countDown();
     *      } catch (InterruptedException ex) {} // return;
     *   }
     *
     *   void doWork() { ... }
     * }
     *
     * </pre>
     *
     * <p>Memory consistency effects: Until the count reaches
     * zero, actions in a thread prior to calling
     * {@code countDown()}
     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     * actions following a successful return from a corresponding
     * {@code await()} in another thread.
     *
     * @since 1.5
     * @author Doug Lea
     */
    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * Constructs a {@code CountDownLatch} initialized with the given count.
         *
         * @param count the number of times {@link #countDown} must be invoked
         *        before threads can pass through {@link #await}
         * @throws IllegalArgumentException if {@code count} is negative
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>If the current count is zero then this method returns immediately.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of two things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * @throws InterruptedException if the current thread is interrupted
         *         while waiting
         */
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
         * or the specified waiting time elapses.
         *
         * <p>If the current count is zero then this method returns immediately
         * with the value {@code true}.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of three things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>The specified waiting time elapses.
         * </ul>
         *
         * <p>If the count reaches zero then the method returns with the
         * value {@code true}.
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * <p>If the specified waiting time elapses then the value {@code false}
         * is returned.  If the time is less than or equal to zero, the method
         * will not wait at all.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the {@code timeout} argument
         * @return {@code true} if the count reached zero and {@code false}
         *         if the waiting time elapsed before the count reached zero
         * @throws InterruptedException if the current thread is interrupted
         *         while waiting
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    
        /**
         * Returns the current count.
         *
         * <p>This method is typically used for debugging and testing purposes.
         *
         * @return the current count
         */
        public long getCount() {
            return sync.getCount();
        }
    
        /**
         * Returns a string identifying this latch, as well as its state.
         * The state, in brackets, includes the String {@code "Count ="}
         * followed by the current count.
         *
         * @return a string identifying this latch, as well as its state
         */
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    }
    View Code

    只有320行,其中大部分是注释,纯代码应该在100行以下

    主要功能由AQS完成,这再一次说明了AQS的强大之处。

    0. CountDownLatch简介

    CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

    主要方法
    public CountDownLatch(int count);
    public void countDown();
    public void await() throws InterruptedException

    构造方法参数指定了计数的次数
    countDown方法,当前线程调用此方法,则计数减一
    await方法,调用此方法会一直阻塞当前线程,直到计时器的值为0

    1. CountDownLatch原理概述

    CountDownLatch利用AQS的state变量维护了计数的次数,每次调用countDown方法会对state做自减操作,当state减少到0时,释放所有在CountDownLatch上等待的线程。

    2. CountDownLatch构造方法

    CountDownLatch
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
    CountDownLatch.Sync
            Sync(int count) {
                setState(count);//初始化时,将AQS的state变量设置为count
            }

    3. CountDownLatch.await方法的执行轨迹

    CountDownLatch.await
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    AbstractQueuedSynchronizer.acquireSharedInterruptibly
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    CountDownLatch.Sync.tryAcquireShared
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;//如果state为0,说明此CountDownLatch已失效,返回1,acquireInterruptibly函数立即返回。否则后续会调用下面的AbstractQueuedSynchronizer.doAcquireSharedInterruptibly方法
            }
    
    AbstractQueuedSynchronizer.doAcquireSharedInterruptibly
        /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//创建共享类型的节点并插入AQS的等待队列中
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);//state != 0 (CountDownLatch仍处于锁定状态) -> r == -1 -> 继续自旋;state == 0 (CountDownLatch已经解锁) -> r == 1 -> 函数返回 -> 工作线程不再阻塞
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);//头结点的状态位被标记为Propagate,后续节点的状态一般都是Shared,也可能是Canceled
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&//park当前线程
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    大概意思是凡是调用CountDownLatch.await方法的线程,只要CountDownLatch未失效(state不等于0),都会去AQS的等待队列里创建节点,然后将自己park住

    CountDownLatch还有带超时参数重载的await方法,逻辑比较接近,在此就不赘述了。

    4. CountDownLatch.countDown方法的执行轨迹

    CountDownLatch.countDown
        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    
    
    AbstractQueuedSynchronizer.releaseShared
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    
    CountDownLatch.Sync.tryReleaseShared
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();//获取AQS的state变量
                    if (c == 0)//如果state已经为0了,那么CountDownLatch已经没有资源可以释放了,上面的releaseShared方法也会直接返回
                        return false;
                    int nextc = c-1;//利用cas操作原子性的将state自减1,如果state刚好被减到0,那么返回true,触发后续的doReleaseShared调用
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
    
    AbstractQueuedSynchronizer.doReleaseShared
        /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    大意就是每次调用CountDownLatch.countDown方法都会使AQS维护的state变量自减1

    state减到0的时候,会触发对AbstractQueuedSynchronizer.doReleaseShared方法的调用,它会唤醒AQS等待队列中的所有等待线程

    这些线程被唤醒之后,发现state变量已经变成0了,于是结束自旋并返回

    这样就实现了CountDownLatch的语义

  • 相关阅读:
    257. Binary Tree Paths
    324. Wiggle Sort II
    315. Count of Smaller Numbers After Self
    350. Intersection of Two Arrays II
    295. Find Median from Data Stream
    289. Game of Life
    287. Find the Duplicate Number
    279. Perfect Squares
    384. Shuffle an Array
    E
  • 原文地址:https://www.cnblogs.com/stevenczp/p/7152046.html
Copyright © 2011-2022 走看看