zoukankan      html  css  js  c++  java
  • CountDownLatch源码分析

    概述

    CountDownLatchJ.U.C包中提供的一个并发工具类,其主要作用是协调多个线程之间的同步,其可以让一个线程在等待其他线程执行完任务之后再继续执行。

    demo1

    假设现在有一场考试,考场中有五个人,考试时间是1s,那么监考老师只能等考试时间到了才能收卷。使用CountDownLatch如下:

     public static void main(String[] args) {
    
    
    
            final CountDownLatch latch = new CountDownLatch(10);
    
            ExecutorService service = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
    
            try {
                for (int i = 0; i < 10; i++) {
                    service.execute(()->{
    
                        try {
                            System.out.println("学生i" + Thread.currentThread().getName() + " 开始答题。。。。");
                            Thread.sleep(1000);
                            System.out.println("时间到,学生i"+Thread.currentThread().getName()+"交卷。。。。");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            latch.countDown();
    
                        }
                    });
                }
    
                System.out.println("考试开始。。。");
                latch.await();
                service.shutdown();
                System.out.println("考试结束。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    启动运行,输出如下:

    考试开始。。。
    学生ipool-1-thread-1 开始答题。。。。
    学生ipool-1-thread-3 开始答题。。。。
    学生ipool-1-thread-2 开始答题。。。。
    学生ipool-1-thread-4 开始答题。。。。
    学生ipool-1-thread-5 开始答题。。。。
    学生ipool-1-thread-6 开始答题。。。。
    学生ipool-1-thread-7 开始答题。。。。
    学生ipool-1-thread-8 开始答题。。。。
    学生ipool-1-thread-9 开始答题。。。。
    学生ipool-1-thread-10 开始答题。。。。
    时间到,学生ipool-1-thread-1交卷。。。。
    时间到,学生ipool-1-thread-3交卷。。。。
    时间到,学生ipool-1-thread-6交卷。。。。
    时间到,学生ipool-1-thread-9交卷。。。。
    时间到,学生ipool-1-thread-2交卷。。。。
    时间到,学生ipool-1-thread-10交卷。。。。
    时间到,学生ipool-1-thread-8交卷。。。。
    时间到,学生ipool-1-thread-7交卷。。。。
    时间到,学生ipool-1-thread-5交卷。。。。
    时间到,学生ipool-1-thread-4交卷。。。。
    考试结束。。。
    

    源码分析

    成员变量

      //私有静态内部类  
      private final Sync sync;
      // Sync继承自AQS 从而具有队列同步的功能
      private static final class Sync extends AbstractQueuedSynchronizer {
           。。。。
        }
    

    构造方法

    public CountDownLatch(int count) {
            //入参校验
            if (count < 0) throw new IllegalArgumentException("count < 0");
            //初始化 sync
            this.sync = new Sync(count);
        }
    
    
    Sync(int count) {
                setState(count);
            }
    
    //AbstractQueuedSynchronizer类
    
    private volatile int state;
    
    protected final void setState(int newState) {
            state = newState;
        }
    

    CountDownLatch的构造方法实际上是对Sync进行初始化,而Sync的构造方法底层又是调用AQS框架的setState方法来设置计数器的值。

    countdown方法

     public void countDown() {
            sync.releaseShared(1);
        }
    

    countDown方法很简单,就是调用AQS框架里的releaseShared来改变state的值,源码如下:

       public final boolean releaseShared(int arg) {
           //tryReleaseShared是一个模板方法,由其子类进行实现
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    我们来看一下tryReleaseShared这个方法,在AQS框架中,这是一个模板方法,由继承它的子类来具体实现。在CountDownLatch中其私有静态内部类Sync继承了AQS,所以也会重写该方法,通过自旋和CAS来实现释放锁的目的。如下所示:

      protected boolean tryReleaseShared(int releases) {
                //自旋
                for (;;) {
                    //获取计数器的值
                    int c = getState();
                    // 每次释放的时候,也就是子任务完成的时候计数值减一
                    if (c == 0)
                        return false;
                    //否则的话 将state-1
                    int nextc = c-1;
                    //使用 CAS 修改 state的值
                    if (compareAndSetState(c, nextc))
                        // 子任务均处理完毕后,返回 true; 也就是真正的释放
            	        // 将唤醒阻塞在同步队列的线程
                        // 否则继续自旋
                        return nextc == 0;
                }
            }
    

    tryReleaseShared的返回结果true时,继续执行AQS中的doReleaseShared方法,源码如下:

    private void doReleaseShared() {
            //自旋
            for (;;) {
                //获取CLH队列的头节点
                Node h = head;
                //如果不为null 且CLH队列不只一个节点
                if (h != null && h != tail) {
                    //获取节点的waitStatus
                    int ws = h.waitStatus;
                    //如果是 ws是SINGAL状态
                    if (ws == Node.SIGNAL) {
                        //设置节点状态由 SINGAL变为0失败
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                         //继续自旋
                            continue;
                        //唤醒头节点
                        unparkSuccessor(h);
                    }
                    // 如果ws==0 且 设置状态PROPAGATE失败
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        //继续自旋
                        continue;                
                }
                //判断头节点是否已经反正变化
                if (h == head)      
                    //结束循环
                    break;
            }
        }
    
    

    首先获得head节点。如果head节点不等于空且head节点不等于tail节点,说明CLH队列中此时不止一个节点在排队,获得head节点的waitStatus。判断当前head节点状态是否是SINGAL。处于SINGAL状态的节点,说明当前节点的后继节点处于被唤醒的状态。如果CAS操作将head节点的waitStatus重置为0失败,那么跳出当前循环,继续执行下一次循环(重新检查)。如果重置成功,那么调用unparkSuccessor方法唤醒后继节点。 如果当前head节点状态等于0,通过CAS操作将waitStatus设置为PROPAGATE(传播)状态,确保可以向后一个节点传播下去。如果CAS操作失败,那么当前循环,继续执行下一次循环。最后的h == head,是判断head节点是否发生变化。如果没有发生变化,结束循环。如果发生变化,必须再次循环。

    
        private void unparkSuccessor(Node node) {
            //获取节点的waitStatus
            int ws = node.waitStatus;
            //小于0的话 使用CAS 设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            //当前节点的下一个节点       
            Node s = node.next;
            //如果为null 或者waitStatus>0,即被取消
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从尾开始向前遍历  找到第一个waitStatus小于等于0的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //不为空的话,就唤醒这个节点
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    获取head节点的waitStatus,如果小于0,进行CAS操作重置为0。获取head节点的后继节点,如果后继节点等于null或者后继节点的waitStaus大于0(说明后继节点处于CANCELLED状态),那么从队列从尾部往前进行遍历寻找waitStatus小于等于0的节点。如果这个遍历出来的节点不等于null的话,那么通过LockSupport.unpark()唤醒这个节点中的线程。

    从实现可以看出,每次子任务在调用 countDown 时,会将同步状态值减一,当所有子任务均完成时 (state = 0) 此时会唤醒阻塞在同步队列的节点。

    await方法

    await方法会使当前线程在计数器变为0之前,一直处于等待状态,除非被打断。

     public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    await方法底层调用的还是AQS框架中的acquireSharedInterruptibly方法,源码如下:

      public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
          //判断线程是否中断
            if (Thread.interrupted())
                //如果被中断 则直接抛出异常
                throw new InterruptedException();
          //tryAcquireShared依然是一个模板方法 由子类实现
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    AQS中的acquireSharedInterruptibly方法,会判断线程是否中断。如果中断, 抛出InterruptedException异常。值得注意的是Thread.interrupted()方法,是测试当前线程是否中断。该方法会清除线程的中断状态。换句话说,如果调用这个方法2次,那么第二次会直接返回false,除非当前线程在第一次调用之后再次被中断。如果tryAcquireShared()小于0(说明该计数器值大于0),继续执行doAcquireSharedInterruptibly。

    Sync中的tryAcquireShared方法很简单,源码如下:

    protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    这里只是简单的判断state变量。如果state等于0(说明计数值为0),返回1,否则返回-1(说明计数器值大于0)。

    最后是AQS里的doAcquireSharedInterruptibly方法:

     private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //入队
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    很明显,是通过轮询的方式去获取共享锁。首先将当前线程包装成类型为SHARED的节点,标志为共享类型的节点。获取当前节点的前驱节点。如果当前节点的前驱节点为head节点的话,说明该节点是在AQS队列中等待获取锁的第一个节点。调用CountDownLatch中的tryAcquireShared()尝试去获取锁。返回的值大于0的话,说明获取锁成功。如果获取共享锁成功,那么把当前节点设置为AQS同步队列中的head节点,同时将p.next置为null(方便GC)。回到头看,如果当前节点的前驱节点不是head节点或者获取锁失败,我们需要调用shouldParkAfterFailedAcquire()方法判断当前线程是否需要挂起,如果需要挂起调用 parkAndCheckInterrupt()。

    await方法还有一个重载方法,加入超时机制。

     // 返回false,代表超时 
    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
    // 返回false,代表超时。返回true,代表获得共享锁成功
     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
    
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return true;
                        }
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        //static final long spinForTimeoutThreshold = 1000L;
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    AQS中的doAcquireSharedNanos方法中,如果在nanosTimeout时间范围内,还没有获取共享锁成功的话,直接返回false。spinForTimeoutThreadshold的值为1000nanoseconds。如果shouldParkAfterFailedAcquire(p, node)返回true且超时时间大于阀值spinForTimeoutThreadshold的话,会通过LockSupport.parkNanos(this, nanosTimeout);让线程挂起nanosTimeout时间。这样的策略体现是:如果超时时间很短的话,就不把当前线程挂起,而是通过自旋,这样线程获取锁很快就释放的情况下,可以减少cpu资源和线程挂起和恢复的性能损耗。

  • 相关阅读:
    jQuery轮播插件SuperSlide【2016-10-14】
    【实际项目需求】话题讨论分类支持删除,且删除后对应话题分为改为未分类
    java设计模式—工厂模式
    [php]修改站点的虚拟目录
    [php排错] Forbidden You don't have permission to access / on this server.
    JDBC-MySql
    DHTML中window的使用
    CSS与HTML结合
    DOM使用
    DHTML Object Model&DHTML&DOM
  • 原文地址:https://www.cnblogs.com/reecelin/p/13380604.html
Copyright © 2011-2022 走看看