zoukankan      html  css  js  c++  java
  • Java多线程之JUC包:CountDownLatch源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/go2sea/p/5623218.html

    我们已经了解了AQS的大致工作流程,接下来看下AQS的一个应用——CountDownLatch。

    我们已经知道,AQS提供了两种模式:独占模式&共享模式。CountDownLatch就是一个使用共享模式的自定义同步器实现的共享锁。

    源代码:

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.concurrent.locks.*;
    import java.util.concurrent.atomic.*;
    
    /**
     * A synchronization aid that allows one or more threads to wait until
     * a set of operations being performed in other threads completes.
     *
     * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
     * The {@link #await await} methods block until the current count reaches
     * zero due to invocations of the {@link #countDown} method, after which
     * all waiting threads are released and any subsequent invocations of
     * {@link #await await} return immediately.  This is a one-shot phenomenon
     * -- the count cannot be reset.  If you need a version that resets the
     * count, consider using a {@link CyclicBarrier}.
     *
     * <p>A {@code CountDownLatch} is a versatile synchronization tool
     * and can be used for a number of purposes.  A
     * {@code CountDownLatch} initialized with a count of one serves as a
     * simple on/off latch, or gate: all threads invoking {@link #await await}
     * wait at the gate until it is opened by a thread invoking {@link
     * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
     * can be used to make one thread wait until <em>N</em> threads have
     * completed some action, or some action has been completed N times.
     *
     * <p>A useful property of a {@code CountDownLatch} is that it
     * doesn't require that threads calling {@code countDown} wait for
     * the count to reach zero before proceeding, it simply prevents any
     * thread from proceeding past an {@link #await await} until all
     * threads could pass.
     *
     * <p><b>Sample usage:</b> Here is a pair of classes in which a group
     * of worker threads use two countdown latches:
     * <ul>
     * <li>The first is a start signal that prevents any worker from proceeding
     * until the driver is ready for them to proceed;
     * <li>The second is a completion signal that allows the driver to wait
     * until all workers have completed.
     * </ul>
     *
     * <pre>
     * class Driver { // ...
     *   void main() throws InterruptedException {
     *     CountDownLatch startSignal = new CountDownLatch(1);
     *     CountDownLatch doneSignal = new CountDownLatch(N);
     *
     *     for (int i = 0; i < N; ++i) // create and start threads
     *       new Thread(new Worker(startSignal, doneSignal)).start();
     *
     *     doSomethingElse();            // don't let run yet
     *     startSignal.countDown();      // let all threads proceed
     *     doSomethingElse();
     *     doneSignal.await();           // wait for all to finish
     *   }
     * }
     *
     * class Worker implements Runnable {
     *   private final CountDownLatch startSignal;
     *   private final CountDownLatch doneSignal;
     *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     *      this.startSignal = startSignal;
     *      this.doneSignal = doneSignal;
     *   }
     *   public void run() {
     *      try {
     *        startSignal.await();
     *        doWork();
     *        doneSignal.countDown();
     *      } catch (InterruptedException ex) {} // return;
     *   }
     *
     *   void doWork() { ... }
     * }
     *
     * </pre>
     *
     * <p>Another typical usage would be to divide a problem into N parts,
     * describe each part with a Runnable that executes that portion and
     * counts down on the latch, and queue all the Runnables to an
     * Executor.  When all sub-parts are complete, the coordinating thread
     * will be able to pass through await. (When threads must repeatedly
     * count down in this way, instead use a {@link CyclicBarrier}.)
     *
     * <pre>
     * class Driver2 { // ...
     *   void main() throws InterruptedException {
     *     CountDownLatch doneSignal = new CountDownLatch(N);
     *     Executor e = ...
     *
     *     for (int i = 0; i < N; ++i) // create and start threads
     *       e.execute(new WorkerRunnable(doneSignal, i));
     *
     *     doneSignal.await();           // wait for all to finish
     *   }
     * }
     *
     * class WorkerRunnable implements Runnable {
     *   private final CountDownLatch doneSignal;
     *   private final int i;
     *   WorkerRunnable(CountDownLatch doneSignal, int i) {
     *      this.doneSignal = doneSignal;
     *      this.i = i;
     *   }
     *   public void run() {
     *      try {
     *        doWork(i);
     *        doneSignal.countDown();
     *      } catch (InterruptedException ex) {} // return;
     *   }
     *
     *   void doWork() { ... }
     * }
     *
     * </pre>
     *
     * <p>Memory consistency effects: Until the count reaches
     * zero, actions in a thread prior to calling
     * {@code countDown()}
     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     * actions following a successful return from a corresponding
     * {@code await()} in another thread.
     *
     * @since 1.5
     * @author Doug Lea
     */
    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * Constructs a {@code CountDownLatch} initialized with the given count.
         *
         * @param count the number of times {@link #countDown} must be invoked
         *        before threads can pass through {@link #await}
         * @throws IllegalArgumentException if {@code count} is negative
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>If the current count is zero then this method returns immediately.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of two things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * @throws InterruptedException if the current thread is interrupted
         *         while waiting
         */
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
         * or the specified waiting time elapses.
         *
         * <p>If the current count is zero then this method returns immediately
         * with the value {@code true}.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of three things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>The specified waiting time elapses.
         * </ul>
         *
         * <p>If the count reaches zero then the method returns with the
         * value {@code true}.
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * <p>If the specified waiting time elapses then the value {@code false}
         * is returned.  If the time is less than or equal to zero, the method
         * will not wait at all.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the {@code timeout} argument
         * @return {@code true} if the count reached zero and {@code false}
         *         if the waiting time elapsed before the count reached zero
         * @throws InterruptedException if the current thread is interrupted
         *         while waiting
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    
        /**
         * Returns the current count.
         *
         * <p>This method is typically used for debugging and testing purposes.
         *
         * @return the current count
         */
        public long getCount() {
            return sync.getCount();
        }
    
        /**
         * Returns a string identifying this latch, as well as its state.
         * The state, in brackets, includes the String {@code "Count ="}
         * followed by the current count.
         *
         * @return a string identifying this latch, as well as its state
         */
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    }
    View Code

    可以看到,CountDownLatch只有一个成员变量,是它自定义的同步器:

    private final Sync sync;

    一、await方法

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

    CountDownLatch提供了两种await方法:有等待时长限制和一直等待。两种均能响应中断(归根到底是UNSAFE.park可响应中断。但是如果是定时的park,则不能判断被唤醒的原因是超时还是被中断,因此需要isInterrupted判断下,而此方法会清除中断标志,因此如果是延迟处理要“补上”)。

    await()方法调用了同步器的acquireSharedInterruptibly方法,这个方法由上层AQS提供,它调用了我们重写的tryAcquireShared方法而封装了排队等待、唤醒、响应中断的细节,我们只关注自定义同步器中的tryAcquireShared方法即可:

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }

    注意,tryAcquireShared方法的返回值的意义在AQS是这样规定的:负值代表获取资源失败,非负值代表成功获取资源后剩余资源的数量。而这里当getState返回值为0的时候,我们却总是返回1,表示仍有剩余资源。这看上去并不合理,但这确实是正确的:因为可能有多个线程调用了await,同时在队列中等待资源,CountDownLatch的语义要求我们在倒计时结束有唤醒所有等待线程。因此我们在成功获取资源后,总是要告诉AQS“还有剩余”,这样AQS便会继续唤醒队列中的其他等待线程(由AQS中的setHeadAndPropagate方法调用doReleaseShared来唤醒)。一句话:成功获取总返回1是为了保证唤醒的“延续性”。

    有等待时长限制的await(long, TimeUnit)方法调用了同步器的tryAcquireSharedNanos方法:

        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }

    这个方法首先检测中断,然后试图获取,失败后进入“自旋-等待”阶段,直到成功获取或被中断。这是AQS的内容,不再赘述。

    二、countDown方法

        public void countDown() {
            sync.releaseShared(1);
        }

    countDown方法调用releaseShared释放资源:

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    releaseShared会调用tryReleaseShared方法:

            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }

    方法一直自旋,直到成功释放或倒计时完毕。因为可能有超过count的线程调用countDown,因此releaseShared是可能失败的。当然在释放过程中也可能发生竞争,CAS自旋保证竞争发生时的正确执行。

    三、总结

    CountDownLatch是一个共享锁,但有些特别:他在初始化的时候锁住了所有共享资源,任何线程都可以调用countDown方法释放一个资源,当所有资源都被释放后,所有等待线程被唤醒。从而实现了倒计时的效果。

    CountDownLatch是一次性的,计数值不可恢复。


    作者:开方乘十

    出处:http://www.cnblogs.com/go2sea/

    本文版权归作者开方乘十和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

    如有不正之处,欢迎邮件(hailong.ma@qq.com)指正,谢谢。

  • 相关阅读:
    java (java.exe) 解释器 -D 选项
    Ubuntu 12.04.3 X64 使用 NFS 作为文件共享存储方式 安装 Oracle11g RAC
    Ubuntu下 Oracle sqldeveloper中文目录、文件,select查询结果中:中文乱码
    行测题型
    Left join on where 区别
    常见公文——决定和请示
    宜家沙发测评
    "放管服"改革 清单
    shell && and ||
    ORA-01722: invalid number
  • 原文地址:https://www.cnblogs.com/go2sea/p/5623218.html
Copyright © 2011-2022 走看看