zoukankan      html  css  js  c++  java
  • 【JDK源码分析】并发包同步工具CountDownLatch

    前言

    CountDownLatch是一个闭锁实现,它可以使一个或者多个线程等待一组事件发生。它包含一个计数器,用来表示需要等待的事件数量,coutDown方法用于表示一个事件发生,计数器随之递减,而await方法等待计数器为0之前一直阻塞。它是基于AQS的共享锁来实现的,其中使用了较多的AQS的方法,所以在这之前最好阅读过AQS的源码,可以查看本人之前AQS的源码分析,有些AQS方法没有在之前分析过的这里涉及到了会进行分析。

    源码

    我们先看它的属性和构造器,

        // Sync为其内部类
        private final Sync sync;
    
        // 唯一的一个构造器
        // 构造参数count就是需要等待事件的数量
        public CountDownLatch(int count) {
            // 为了保证count >= 0
            if (count < 0) throw new IllegalArgumentException("count < 0");
            // 构造sync
            this.sync = new Sync(count);
        }

    现在来看内部类Sync,它继承了AQS,实现了共享锁方法,下面来看其源码,代码行数不多很好理解

        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                // setState 为AQS更改其state变量的方法
                // 将AQS state变量设置成count
                setState(count);
            }
    
            int getCount() {
                // AQS的获取state锁状态值
                return getState();
            }
            // 尝试获取共享锁
            protected int tryAcquireShared(int acquires) {
                // 返回1表示此时锁状态值为0表示锁已释放
                // -1表示此时锁状态值大于0,表示出于锁定状态
                return (getState() == 0) ? 1 : -1;
            }
            // 尝试释放共享锁(计数器递减releases次)
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                // 等待锁状态值为0或者更改锁状态值成功
                for (;;) {
                    // 将state赋值给变量c
                    int c = getState();
                    if (c == 0)
                        // 此时锁已清除
                        return false;
                    // 递减
                    int nextc = c-1;
                    // 比较state的状态值是否等于C,等于将state状态值改为nextc
                    if (compareAndSetState(c, nextc))
                        // 更改成功后,如果nextc为0则返回true
                        return nextc == 0;
                }
            }
        }

    await方法

    await方法就是当state状态值不为0时将当前线程阻塞,然后等待唤醒

        public void await() throws InterruptedException {
            //调用的AQS获取共享锁可中断方法
            sync.acquireSharedInterruptibly(1);
        }

    我们来看看AQS的acquireSharedInterruptibly方法

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 此方法调用的是CountDownLatch内部类Sync的方法
            // 如果锁状态不为0,则执行doAcquireSharedInterruptibly方法
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

    doAcquireSharedInterruptibly方法也是由AQS实现的

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 添加一个共享锁节点到队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 直到线程被唤醒或者线程被中断时跳出循环
                for (;;) {
                    // node节点的前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 调用CountDownLatch内部类Sync的方法
                        // 如果锁状态值为0,则返回值大于0
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 当锁状态值为0,开始将note节点设置为头节点并唤醒后继节点
                            // 也就是队列不断的出列,然后唤醒后继节点,后继节点被唤醒后由于前驱节点被设置成头节点,又会调用该方法进行后继节点的唤醒
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
    
                    /*
                     shouldParkAfterFailedAcquire用于清除已中断/或者取消的线程以及判断此次循环是否需要挂起线程
                     parkAndCheckInterrupt 挂机当前线程
                     shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在AQS之前博文里分析过这里就不再分析了
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    // 表示当前线程中断,取消获取锁
                    // 之前分析过,略过源码分析
                    cancelAcquire(node);
            }
        }

    setHeadAndPropagate方法,主要作用是唤醒后继节点线程

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            // 当前节点设置为头节点,节点关联的线程设置为空
            setHead(node)
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    // 节点等待状态为signal时,唤醒后继节点线程
                    doReleaseShared();
            }
        }

    doReleaseShared很巧妙,当当前节点等待状态为signal时,唤醒后继节点线程

        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    // 当前线程等待状态为signal时表示后继节点需要唤醒
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            // 表示h节点的状态替换失败,会再次循环判断h节点的状态
                            continue;            // loop to recheck cases
                        // 唤醒后继节点
                        unparkSuccessor(h);
                    }
                    // 状态为0时,将其改成PROPAGATE,更改失败会再次循环判断h节点的状态
              // 这种情况发生在一个线程调用await方法,节点的等待状态还是初始值0未来得及被修改,刚好state被置为0然后调用了doReleaseShared方法
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    countDown方法

    countDown方法递减state值,当值为0时,依次唤醒等待的线程

        public void countDown() {
            // 递减一次state值,知道state为0时唤醒等待中的线程
            sync.releaseShared(1);
        }
    public final boolean releaseShared(int arg) { // 尝试将state减去arg if (tryReleaseShared(arg)) { // state为0时唤醒线程 doReleaseShared(); return true; } return false; }

    到此分析完毕。

    总结

    1. 通过源码知道CountDownLatch 不能像CyclicBarrier那样使用完毕后还可以复用;
    2. CountDownLatch 是通过共享锁来实现的,它的构造参数就是AQS state的值;
    3. 由于内部类继承了AQS,所以它内部也是FIFO队列,同时也一样是前驱节点唤醒后继节点。
  • 相关阅读:
    redis分布式锁解决超卖问题
    redis使用
    Xcode 解决日志打印不全问题
    苹果电脑系统怎么重装?这几步就可以轻松搞定
    Mac 一键显示所有隐藏文件 不要那么六好吧
    iOS导入高德地图出现缺失armv7--"Undefined symbols for architecture armv7"
    如何生成.a文件,小心有坑!!
    保护你的代码,生成.a文件以及.framework文件需要注意的地方
    二维码扫描工具实现
    iOS 调整图片尺寸,告诉你的UI,别问我尺寸!我要最大的
  • 原文地址:https://www.cnblogs.com/d-homme/p/9375105.html
Copyright © 2011-2022 走看看