zoukankan      html  css  js  c++  java
  • CountDownLatch源码解析

    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
            
            // 尝试获取共享锁 如果state=0,则可以获取到锁,流程往下进行
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * @param count 线程可以停止阻塞await往下执行countDown需要被执行的次数 
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间
         * 
         * 如果当前count为0那么await直接返回true
         * 如果在count为0之前就超时了返回false
         * 如果进入方法前就已经设置了中断态或者等待的时候被中断,抛出异常
         * 
         * count不为0则线程停止调度,直到下面三件事发生
         * 1)调用countDown方法使得count=0
         * 2)其他线程中断了当前线程
         * 3)等待超时
         */
        public void await() throws InterruptedException {
            // 要等待吗
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间超时
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public void countDown() {
            sync.releaseShared(1);
        }
        
        public abstract class AbstractQueuedSynchronizer {
            // 获取共享锁 
            public final void acquireShared(int arg) {
                // 子类实现 
                // 返回负数代表失败
                // 0代表成功,但后继争用线程不会成功
                // 整数代表都可以成功
                if (tryAcquireShared(arg) < 0)
                    doAcquireShared(arg);
            }
            
            private void doAcquireShared(int arg) {
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    boolean interrupted = false;
                    for (;;) {
                        final Node p = node.predecessor();
                        if (p == head) {
                            int r = tryAcquireShared(arg);
                            // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                            if (r >= 0) {
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                if (interrupted)
                                    selfInterrupt();
                                failed = false;
                                return;
                            }
                        }
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            interrupted = true;
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
            
            /**
             * 在获取共享锁成功后,设置head节点
             * 根据调用的tryAcquireShared返回的状态以及节点本身的等待状态来判断是否需要唤醒后继线程
             *
             */
            private void setHeadAndPropagate(Node node, int propagate) {
                // 获取老的head  
                Node h = head;
                // head移动到当前节点,清空线程和prev
                setHead(node);
                
                // propagate是tryAcquireShared返回值,决定是否传播唤醒的依据之一
                // 1)propagate>0,代表前面的节点获取到了锁且释放了
                // 2)h = null 没有竞争
                // 3)如果ws = -1 || ws = -2 || ws = -3 都往后传播
                if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                    // 查看还有没有后置节点,如果没有或者有共享的后置节点
                    Node s = node.next;
                    if (s == null || s.isShared())
                        doReleaseShared();
                }
            }
            
            private void doReleaseShared() {
                for (;;) {
                    Node h = head;
                    // 如果队列中存在后继线程,唤醒
                    // 多线程同时释放共享锁,处于中间过程,可能会读到head节点ws=0的状态
                    // 此时虽然不能unpark,但是为了保证唤醒能够正确传递,设置ws=PROPAGATE
                    if (h != null && h != tail) {
                        // 获取头节点状态
                        int ws = h.waitStatus;
                        // 如果是等待通知的状态,复位,并唤醒后继节点
                        if (ws == Node.SIGNAL) {
                            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                                continue;               // loop to recheck cases
                            unparkSuccessor(h);
                        }
                        // 如果是初始化状态,则设置头节点状态为传递
                        else if (ws == 0 &&
                                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                            continue;                   // loop on failed CAS
                        }
                        // 检查还是否是head,不是则继续循环
                        if (h == head)                   // loop if head changed
                            break;
                    }
                }
            }
        
        
            // 先看没有设置超时的等待
            
            // 以可中断模式获取锁
            private void doAcquireSharedInterruptibly(int arg)
                throws InterruptedException {
                // 共享锁,添加到队列中,插到队尾
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    for (;;) {
                        // 获取前置节点
                        final Node p = node.predecessor();
                        // 判断当前节点是否为头节点的后置节点
                        if (p == head) {
                            // 如果是,代表当前线程可以获取锁;
                            
                            // 先拿status的值,如果为0,返回1,则可以获锁,否则往后执行
                            // countDownLatch入参arg传1
                            int r = tryAcquireShared(arg);
                            if (r >= 0) {
                                // 获取到了,代表线程不阻塞了
                                // 将head设置为当前节点
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                failed = false;
                                return;
                            }
                        }
                        // 如果前面有在排队的其他线程,根据前驱节点的ws判断是否要park
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            throw new InterruptedException();
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
           
            
            // 没有获取到锁,是否要park
            private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                // 获取前驱节点的ws
                int ws = pred.waitStatus;
                // 如果是前驱节点设置状态为SINGNAL,直接返回true,安心park
                if (ws == Node.SIGNAL)
                    return true;
                if (ws > 0) {
                    // 删除掉取消的节点
                    do {
                        node.prev = pred = pred.prev;
                    } while (pred.waitStatus > 0);
                    pred.next = node;
                } else {
                    // 如果前驱节点是0或者PROPAGATE,代表需要一个通知,暂时不park,
                    // 设置ws为SIGNAL
                    // ws的SIGNAL代表是需要通知下个节点,代表该节点也在等待呢
                    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                }
                return false;
            }
            
            public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
                // 被中断了就抛出异常,清空中断状态
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 尝试获取锁,失败了再尝试获取/有超时限制
                return tryAcquireShared(arg) >= 0 ||
                    doAcquireSharedNanos(arg, nanosTimeout);
            }
            
            private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
                if (nanosTimeout <= 0L)
                    return false;
                    
                // 超时的系统时间
                final long deadline = System.nanoTime() + nanosTimeout;
                // 共享锁,添加到队列中
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    // 死循环
                    for (;;) {
                        // 获取当前节点的前驱节点
                        final Node p = node.predecessor();
                        // 如果前驱节点是head,意味着当前线程可以获取锁
                        if (p == head) {
                            // 尝试获取锁
                            int r = tryAcquireShared(arg);
                            if (r >= 0) {
                                // 设置当前节点为head
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                failed = false;
                                return true;
                            }
                        }
                        nanosTimeout = deadline - System.nanoTime();
                        if (nanosTimeout <= 0L)
                            return false;
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            nanosTimeout > spinForTimeoutThreshold)
                            LockSupport.parkNanos(this, nanosTimeout);
                        if (Thread.interrupted())
                            throw new InterruptedException();
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
            
           
    
  • 相关阅读:
    数学+高精度 ZOJ 2313 Chinese Girls' Amusement
    最短路(Bellman_Ford) POJ 1860 Currency Exchange
    贪心 Gym 100502E Opening Ceremony
    概率 Gym 100502D Dice Game
    判断 Gym 100502K Train Passengers
    BFS POJ 3278 Catch That Cow
    DFS POJ 2362 Square
    DFS ZOJ 1002/HDOJ 1045 Fire Net
    组合数学(全排列)+DFS CSU 1563 Lexicography
    stack UVA 442 Matrix Chain Multiplication
  • 原文地址:https://www.cnblogs.com/zerodsLearnJava/p/12826184.html
Copyright © 2011-2022 走看看