zoukankan      html  css  js  c++  java
  • 源码分析:CountDownLatch 之倒计时门栓

    简介

    CountDownLatch 是JDK1.5 开始提供的一种同步辅助工具,它允许一个或多个线程一直等待,直到其他线程执行的操作完成为止。在初始化的时候给定 CountDownLatch 一个计数,调用await() 方法的线程会一直等待,其他线程执行完操作后调用countDown(),当计数减到0 ,调用await() 方法的线程被唤醒继续执行。

    应用场景

    1. 多线程并发下载或上传
      主线程初始化一个为5的CountDownLatch ,然后分发给5个线程去完成下载或上传的动作,主线程等待其他线程完成任务后返回成功呢。
    2. 首页,一个复杂的查询包含多个子查询,但是子查询结果互相不依赖,也可以使用 CountDownLatch ,等待多个查询完成后再一起返回给首页。

    源码分析

    CountDownLatch 的源码相对于之前介绍的几个同步类,代码量要少很多很多,在JDK 1.8版本中也就300多行(包含注释),所以分析起来也比较简单。

    内部类Sync

    同样的,该内部类也继承了AQS,代码展示:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) { // 同步器的构造方法,初始化计数
            setState(count);
        }
       ...
    }
    

    主要的属性

    主要的属性就一个,也就是内部类实例:同步器Sync

    private final Sync sync;
    

    构造方法

    CountDownLatch 就一个构造方法,必须制定初始化计数

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count); // 初始化同步器,指定计数
    }
    

    CountDownLatch 不算构造方法和toString方法一共也才4个方法,不多,所以我们全部看一下

    await() 方法

    调用该方法的线程会被阻塞,指定初始化的计数被减为0,或者线程被中断抛出异常。

    代码展示:

    // CountDownLatch.await()
    public void await() throws InterruptedException { // 会抛出中断异常
        sync.acquireSharedInterruptibly(1); //调用的是同步器框架AQS的方法
    }
    // AQS框架代码
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted()) // 检查线程中断状态,抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) // 套路一样,调用Sync里面的方法
            doAcquireSharedInterruptibly(arg); // 阻塞线程,排队,等待被唤醒
    }
    // 内部类Sync.tryAcquireShared()
    protected int tryAcquireShared(int acquires) {
        // 检查计数,如果为0,返回1,如果不为0,返回-1;
        return (getState() == 0) ? 1 : -1;  
    }
    

    await() 方法总结:

    1. 这应该是最简单的一个tryAcquireShared方法实现了。
    2. 仅调用了getState来检查当前计数,如果计数为0,返回1;如果计数不为0,返回-1。
    3. 阻塞线程,排队,等待被唤醒,中断抛出异常等逻辑都是在AQS实现的,具体分析请看之前的AQS分析文章

    boolean await(timeout, unit)方法

    和无参数的await()方法唯一的区别就是该方法指定了等待超时的时间,并且有返回值;
    如果计数为0,则返回true;
    如果线程被中断,则抛出异常;
    如果线程经过了指定的等待时间,则返回false;

    代码展示:

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted()) // 检查线程中断状态
            throw new InterruptedException();
        // tryAcquireShared 只会返回1或者-1,返回1代表计数已经为0,直接返回true
        // doAcquireSharedNanos 是AQS 框架里面的代码
        return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
    }
    
    // AQS 框架里面的代码
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 计算超时时间
        final long deadline = System.nanoTime() + nanosTimeout;
        // 构建当前排队节点,并加入队列,精灵王之前有分析
        final Node node = addWaiter(Node.SHARED); //共享节点
        boolean failed = true;
        try {
            for (;;) { // 自旋 tryAcquireShared(arg)
                final Node p = node.predecessor();
                if (p == head) { // 轮到当前节点了
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 这里返回的大于等于0,说明计数为0,返回true
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false; // 超时了,直接返回false
                if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout); // 阻塞当前线程
                if (Thread.interrupted()) // 中断抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed) // 节点被取消
                cancelAcquire(node);
        }
    }
    

    countDown() 方法

    如果当前计数大于零,则将其递减,如果计数达到零,则唤醒所有等待的线程(调用了await方法的线程)。如果当前计数等于零,那么什么也不会发生。源码展示:

    public void countDown() {
        sync.releaseShared(1); // 调用AQS递减计数
    }
    
    // AQS同步框架的代码
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 调用自己实现的方法tryReleaseShared
            doReleaseShared(); //计数为0,唤醒所有等待的线程,返回true
            return true;
        }
        return false;
    }
    // CDL 自己实现的递减计数方法
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) { // 自旋,保证递减操作成功
            int c = getState(); // 当前的技术
            if (c == 0) // 计数已经是0了,返回false,之后啥也不会发生
                return false;
            int nextc = c-1; // 递减
            if (compareAndSetState(c, nextc)) // cas 更新计数
                return nextc == 0; 计数为0才返回true
        }
    }
    // 唤醒等待的线程
    private void doReleaseShared() {
        for (;;) { //自旋操作
            Node h = head;
            if (h != null && h != tail) { // 等待的线程队列不为空
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//  检查状态是否要唤醒下一个节点的线程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // CAS 失败了才会继续continue
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 唤醒头节点的下一个节点线程
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 头节点没变
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    countDown() 方法总结:

    1. 主要逻辑就是把计数减1
    2. 如果计数减到了0,则唤醒所有队列中等待的线程
    3. 如果减之前计数已经是0了,则什么也不干

    getCount() 方法

    public long getCount() { // CDL 的API
        return sync.getCount();
    }
    // 内部类 Sync
    int getCount() {
        return getState();
    }
    // AQS 框架api
    protected final int getState() {
        return state;
    }
    

    返回当前的计数。

    CountDownLatch 总结

    1. 主要功能维护计数,当计数减为零后才放开所有等待的线程
    2. CountDownLatch 没有加计数的API,所以一个CountDownLatch不可以重复使用,如果要用可以重置计数的,可以使用CyclicBarrier。
    3. CountDownLatch 也会有“死锁”的现象,要避免计数永远减不到0的情况
    4. 如果初始化计数为0,那么 CountDownLatch 则毫无作用,不如不用
    5. 如果初始化计数为1,调用await时阻塞自己,别人countDown解锁后,再唤醒自己(类似于在等一个资源,拿到资源在继续进行)

    和Semaphore的区别

    Semaphore 可以用来限流,比如限制一个景区最多允许10000人同时在园内,只有当有人出园后,才允许其他人入园。

    CountDownLatch 可以用来计数,比如导游在出发点等待10名游客一起出发,来一名游客就画个叉,直到10名游客到齐后,才一起出发去旅游。

  • 相关阅读:
    C++ 概念易错点
    C++的位操作符备忘
    C++关键词
    在ubuntu下安装drupal6
    C++符号优先级一览
    开启drupal的clear urls
    VC6.0使用PlaySound函数报错
    小记一下以非string为结束条件的循环
    C++中查看数据类型的方法
    在ubuntu下安装和配置drupal
  • 原文地址:https://www.cnblogs.com/admol/p/14020541.html
Copyright © 2011-2022 走看看