zoukankan      html  css  js  c++  java
  • 【Java并发】- 5.对并发工具类CountDownLatch的源码解析

    1.简介

    CountDownLatch允许一个或多个线程等待其他线程完成操作。

    与thread的join方法的实现的功能相似,不过join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。

    与join不同CountDownLatch可以让开发者自行定义线程执行的位置。CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

    故CountDownLatch使用起来比join方法更加灵活。

    注意 计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会
    阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的
    内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

    2.如何使用CountDownLatch

    public class CountDownLatchDemo {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            IntStream.range(0,3).forEach(i -> new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    System.out.println("hello");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start());
    
            System.out.println("子线程执行完毕");
    
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("主线程执行完毕");
        }
    }
    

    执行的结果

    子线程执行完毕
    hello
    hello
    hello
    主线程执行完毕
    

    可以看到,我们在CountDownLatch中放入了计算器3,而当3个线程都执行完后才执行的await方法。这样如果在编程中遇到,一个线程执行的前置条件时必须其他几个线程执行到某一步时,使用join方法肯定是不行的,此时就可以使用CountDownLatch来实现线程的等待。

    不过上述使用CountDownLatch存在很大的问题,如果计算器与执行countDownLatch.countDown();的方法个数不一致就会导致,计算器不能到达0,使得await一直死循环。为了解决这种问题出现了await方法的变种方法await(long timeout, TimeUnit unit)这个方法设置了等待时间,如果到了等待时间计算器还不为0就会自动唤醒await的线程,防止线程出现死循环的情况

    public class CountDownLatchDemo {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            IntStream.range(0,3).forEach(i -> new Thread(() -> {
                try {
                    System.out.println("thread" + i);
                    Thread.sleep(2000);
                    System.out.println("hello");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start());
    
            System.out.println("子线程执行完毕");
    
            try {
                countDownLatch.await(1000,TimeUnit.MICROSECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("主线程执行完毕");
        }
    }
    

    程序执行结果

    子线程执行完毕
    thread0
    thread1
    thread2
    主线程执行完毕
    hello
    hello
    hello
    

    3.对CountDownLatch类中方法的解析

    CountDownLatch中包含如下的方法
    在这里插入图片描述
    下面对其中几个重要的方法进行解析。

    构造方法

    分析可得CountDownLatch只有一个有参构造方法

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    构造方法中,

    • 第一步对传入的值进行检验,(因为这是一个减的计算器,所以count必须大于0)
    • 二:声明了一个CountDownLatch类的的私有类Sync
     private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) {
            setState(count);
        }
    }
    

    可知道Sync类是一个AQS实现类。所以CountDownLatch的底层是基于AQS实现的

        protected final void setState(int newState) {
            state = newState;
        }
    

    其最终执行到setState方法设置了计数器的值。

    await方法

    public void await() throws InterruptedException {
    //执行了抽象类(准确的说是sync类继承但没有重写的方法)中的方法
    //至于传入的参数1,在此处没有实际意义,正式在AQS中规定这样传参
       sync.acquireSharedInterruptibly(1);
    }
    
    //抽象类AbstractQueuedSynchronizer的方法
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //判断线程是否被打断
            if (Thread.interrupted())
                throw new InterruptedException();
            //
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    //抽象类AbstractQueuedSynchronizer的方法
    //这个方法没有具体实现,所以应该在其实现类中找其重写的方法
    protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    //CountDownLatch内部类Sync的方法
    protected int tryAcquireShared(int acquires) {
    //这是判断在CountDownLatch构造方法传入的计数器是否为0.参数被没有被使用。
    //如果返回负数acquireSharedInterruptibly方法会进入doAcquireSharedInterruptibly
    //等待计数器的值变为0,如果返回正数则acquireSharedInterruptibly通过执行
    //即CountDownLatch中计数器值为0,可以唤醒主线程执行
                return (getState() == 0) ? 1 : -1;
            }
    

    await方法会在CountDownLatch中的计数器不为0之前一直等待,直到计数器为0,才会唤醒执行await方法的线程,当然线程也有可能被打的而唤醒,然后抛出InterruptedException 异常。

    所以await方法的逻辑很简单,就是判断CountDownLatch中计数器的值算法为0,

    • 不为0就把线程放入等待队列
    • 为0,则线程继续然后执行

    await方法的变种方法await(long timeout, TimeUnit unit)

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
    protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    和await方法的逻辑类似,只是在*doAcquireSharedNanos(arg, nanosTimeout);*方法中做了超时唤醒的处理。这个方法是由AQS具体实现,故在分析AQS代码时在具体分析。

    即如果到达超时时间,CountDowmLatch中的计数器还没有归0,执行await的方法也照常唤醒

    countDown()方法

    public void countDown() {
            sync.releaseShared(1);
        }
    //抽象类AbstractQueuedSynchronizer的方法
    public final boolean releaseShared(int arg) {
    //判断tryReleaseShared返回,如果为真则通过doReleaseShared唤醒所以等待的线程且返回true
    //如果为假返回false不做任何处理
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    //CountDownLatch内部类Sync的方法
    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    //判断计数器(State)是否为0.因为如果计数器为0就不做等待操作
                    //直接返回,且如果为0,说明之前必定有一个线程执行了
                    //tryReleaseShared方法对计数器减一成功,故返回false
                    if (c == 0)
                        return false;
                    //通过CAS把c的值减一,再判断计数器是否为0,如果为0则返回true,唤醒
                    //所以等待线程。负责返回false不做任何处理
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    该方法的作用是对计数器进行减一,如果计数器的值变为0,则唤醒所有正在等待的线程。

    执行该方法后CountDownLatch中计数器的值最终只可能>=0。

    CountDownLatch类中其他方法没有分析的价值,故这里不做分析。

  • 相关阅读:
    27. Remove Element
    26. Remove Duplicates from Sorted Array
    643. Maximum Average Subarray I
    674. Longest Continuous Increasing Subsequence
    1. Two Sum
    217. Contains Duplicate
    448. Find All Numbers Disappeared in an Array
    566. Reshape the Matrix
    628. Maximum Product of Three Numbers
    UVa 1349 Optimal Bus Route Design (最佳完美匹配)
  • 原文地址:https://www.cnblogs.com/wf614/p/13168591.html
Copyright © 2011-2022 走看看