zoukankan      html  css  js  c++  java
  • CountDownLatch源码解析

    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
            
            // 尝试获取共享锁 如果state=0,则可以获取到锁,流程往下进行
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * @param count 线程可以停止阻塞await往下执行countDown需要被执行的次数 
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间
         * 
         * 如果当前count为0那么await直接返回true
         * 如果在count为0之前就超时了返回false
         * 如果进入方法前就已经设置了中断态或者等待的时候被中断,抛出异常
         * 
         * count不为0则线程停止调度,直到下面三件事发生
         * 1)调用countDown方法使得count=0
         * 2)其他线程中断了当前线程
         * 3)等待超时
         */
        public void await() throws InterruptedException {
            // 要等待吗
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * 使得当前线程等待直到latch为0,除非线程被打断或者等待时间超时
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public void countDown() {
            sync.releaseShared(1);
        }
        
        public abstract class AbstractQueuedSynchronizer {
            // 获取共享锁 
            public final void acquireShared(int arg) {
                // 子类实现 
                // 返回负数代表失败
                // 0代表成功,但后继争用线程不会成功
                // 整数代表都可以成功
                if (tryAcquireShared(arg) < 0)
                    doAcquireShared(arg);
            }
            
            private void doAcquireShared(int arg) {
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    boolean interrupted = false;
                    for (;;) {
                        final Node p = node.predecessor();
                        if (p == head) {
                            int r = tryAcquireShared(arg);
                            // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                            if (r >= 0) {
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                if (interrupted)
                                    selfInterrupt();
                                failed = false;
                                return;
                            }
                        }
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            interrupted = true;
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
            
            /**
             * 在获取共享锁成功后,设置head节点
             * 根据调用的tryAcquireShared返回的状态以及节点本身的等待状态来判断是否需要唤醒后继线程
             *
             */
            private void setHeadAndPropagate(Node node, int propagate) {
                // 获取老的head  
                Node h = head;
                // head移动到当前节点,清空线程和prev
                setHead(node);
                
                // propagate是tryAcquireShared返回值,决定是否传播唤醒的依据之一
                // 1)propagate>0,代表前面的节点获取到了锁且释放了
                // 2)h = null 没有竞争
                // 3)如果ws = -1 || ws = -2 || ws = -3 都往后传播
                if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                    // 查看还有没有后置节点,如果没有或者有共享的后置节点
                    Node s = node.next;
                    if (s == null || s.isShared())
                        doReleaseShared();
                }
            }
            
            private void doReleaseShared() {
                for (;;) {
                    Node h = head;
                    // 如果队列中存在后继线程,唤醒
                    // 多线程同时释放共享锁,处于中间过程,可能会读到head节点ws=0的状态
                    // 此时虽然不能unpark,但是为了保证唤醒能够正确传递,设置ws=PROPAGATE
                    if (h != null && h != tail) {
                        // 获取头节点状态
                        int ws = h.waitStatus;
                        // 如果是等待通知的状态,复位,并唤醒后继节点
                        if (ws == Node.SIGNAL) {
                            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                                continue;               // loop to recheck cases
                            unparkSuccessor(h);
                        }
                        // 如果是初始化状态,则设置头节点状态为传递
                        else if (ws == 0 &&
                                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                            continue;                   // loop on failed CAS
                        }
                        // 检查还是否是head,不是则继续循环
                        if (h == head)                   // loop if head changed
                            break;
                    }
                }
            }
        
        
            // 先看没有设置超时的等待
            
            // 以可中断模式获取锁
            private void doAcquireSharedInterruptibly(int arg)
                throws InterruptedException {
                // 共享锁,添加到队列中,插到队尾
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    for (;;) {
                        // 获取前置节点
                        final Node p = node.predecessor();
                        // 判断当前节点是否为头节点的后置节点
                        if (p == head) {
                            // 如果是,代表当前线程可以获取锁;
                            
                            // 先拿status的值,如果为0,返回1,则可以获锁,否则往后执行
                            // countDownLatch入参arg传1
                            int r = tryAcquireShared(arg);
                            if (r >= 0) {
                                // 获取到了,代表线程不阻塞了
                                // 将head设置为当前节点
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                failed = false;
                                return;
                            }
                        }
                        // 如果前面有在排队的其他线程,根据前驱节点的ws判断是否要park
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            throw new InterruptedException();
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
           
            
            // 没有获取到锁,是否要park
            private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                // 获取前驱节点的ws
                int ws = pred.waitStatus;
                // 如果是前驱节点设置状态为SINGNAL,直接返回true,安心park
                if (ws == Node.SIGNAL)
                    return true;
                if (ws > 0) {
                    // 删除掉取消的节点
                    do {
                        node.prev = pred = pred.prev;
                    } while (pred.waitStatus > 0);
                    pred.next = node;
                } else {
                    // 如果前驱节点是0或者PROPAGATE,代表需要一个通知,暂时不park,
                    // 设置ws为SIGNAL
                    // ws的SIGNAL代表是需要通知下个节点,代表该节点也在等待呢
                    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                }
                return false;
            }
            
            public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
                // 被中断了就抛出异常,清空中断状态
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 尝试获取锁,失败了再尝试获取/有超时限制
                return tryAcquireShared(arg) >= 0 ||
                    doAcquireSharedNanos(arg, nanosTimeout);
            }
            
            private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
                if (nanosTimeout <= 0L)
                    return false;
                    
                // 超时的系统时间
                final long deadline = System.nanoTime() + nanosTimeout;
                // 共享锁,添加到队列中
                final Node node = addWaiter(Node.SHARED);
                boolean failed = true;
                try {
                    // 死循环
                    for (;;) {
                        // 获取当前节点的前驱节点
                        final Node p = node.predecessor();
                        // 如果前驱节点是head,意味着当前线程可以获取锁
                        if (p == head) {
                            // 尝试获取锁
                            int r = tryAcquireShared(arg);
                            if (r >= 0) {
                                // 设置当前节点为head
                                setHeadAndPropagate(node, r);
                                p.next = null; // help GC
                                failed = false;
                                return true;
                            }
                        }
                        nanosTimeout = deadline - System.nanoTime();
                        if (nanosTimeout <= 0L)
                            return false;
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            nanosTimeout > spinForTimeoutThreshold)
                            LockSupport.parkNanos(this, nanosTimeout);
                        if (Thread.interrupted())
                            throw new InterruptedException();
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
            
           
    
  • 相关阅读:
    iOS:真机调试
    iOS:MBProgressHUD的基本使用
    CocoaPods安装小步骤
    PictureBox 双缓冲防止闪屏
    两招小办法对付宝宝发烧、咳嗽。超级管用哈
    (转)经纬度坐标转换为屏幕坐标
    解决eclipse不识别Android手机的问题
    利用FFmpeg将RTSP转码成RTMP发布在RED5
    vlc做转发的命令
    字符编码
  • 原文地址:https://www.cnblogs.com/zerodsLearnJava/p/12826184.html
Copyright © 2011-2022 走看看