zoukankan      html  css  js  c++  java
  • 【Java并发】- 5.对并发工具类CountDownLatch的源码解析

    1.简介

    CountDownLatch允许一个或多个线程等待其他线程完成操作。

    与thread的join方法的实现的功能相似,不过join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。

    与join不同CountDownLatch可以让开发者自行定义线程执行的位置。CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

    故CountDownLatch使用起来比join方法更加灵活。

    注意 计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会
    阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的
    内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

    2.如何使用CountDownLatch

    public class CountDownLatchDemo {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            IntStream.range(0,3).forEach(i -> new Thread(() -> {
                try {
                    Thread.sleep(2000);
                    System.out.println("hello");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start());
    
            System.out.println("子线程执行完毕");
    
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("主线程执行完毕");
        }
    }
    

    执行的结果

    子线程执行完毕
    hello
    hello
    hello
    主线程执行完毕
    

    可以看到,我们在CountDownLatch中放入了计算器3,而当3个线程都执行完后才执行的await方法。这样如果在编程中遇到,一个线程执行的前置条件时必须其他几个线程执行到某一步时,使用join方法肯定是不行的,此时就可以使用CountDownLatch来实现线程的等待。

    不过上述使用CountDownLatch存在很大的问题,如果计算器与执行countDownLatch.countDown();的方法个数不一致就会导致,计算器不能到达0,使得await一直死循环。为了解决这种问题出现了await方法的变种方法await(long timeout, TimeUnit unit)这个方法设置了等待时间,如果到了等待时间计算器还不为0就会自动唤醒await的线程,防止线程出现死循环的情况

    public class CountDownLatchDemo {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            IntStream.range(0,3).forEach(i -> new Thread(() -> {
                try {
                    System.out.println("thread" + i);
                    Thread.sleep(2000);
                    System.out.println("hello");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start());
    
            System.out.println("子线程执行完毕");
    
            try {
                countDownLatch.await(1000,TimeUnit.MICROSECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("主线程执行完毕");
        }
    }
    

    程序执行结果

    子线程执行完毕
    thread0
    thread1
    thread2
    主线程执行完毕
    hello
    hello
    hello
    

    3.对CountDownLatch类中方法的解析

    CountDownLatch中包含如下的方法
    在这里插入图片描述
    下面对其中几个重要的方法进行解析。

    构造方法

    分析可得CountDownLatch只有一个有参构造方法

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    

    构造方法中,

    • 第一步对传入的值进行检验,(因为这是一个减的计算器,所以count必须大于0)
    • 二:声明了一个CountDownLatch类的的私有类Sync
     private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) {
            setState(count);
        }
    }
    

    可知道Sync类是一个AQS实现类。所以CountDownLatch的底层是基于AQS实现的

        protected final void setState(int newState) {
            state = newState;
        }
    

    其最终执行到setState方法设置了计数器的值。

    await方法

    public void await() throws InterruptedException {
    //执行了抽象类(准确的说是sync类继承但没有重写的方法)中的方法
    //至于传入的参数1,在此处没有实际意义,正式在AQS中规定这样传参
       sync.acquireSharedInterruptibly(1);
    }
    
    //抽象类AbstractQueuedSynchronizer的方法
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //判断线程是否被打断
            if (Thread.interrupted())
                throw new InterruptedException();
            //
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    //抽象类AbstractQueuedSynchronizer的方法
    //这个方法没有具体实现,所以应该在其实现类中找其重写的方法
    protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    //CountDownLatch内部类Sync的方法
    protected int tryAcquireShared(int acquires) {
    //这是判断在CountDownLatch构造方法传入的计数器是否为0.参数被没有被使用。
    //如果返回负数acquireSharedInterruptibly方法会进入doAcquireSharedInterruptibly
    //等待计数器的值变为0,如果返回正数则acquireSharedInterruptibly通过执行
    //即CountDownLatch中计数器值为0,可以唤醒主线程执行
                return (getState() == 0) ? 1 : -1;
            }
    

    await方法会在CountDownLatch中的计数器不为0之前一直等待,直到计数器为0,才会唤醒执行await方法的线程,当然线程也有可能被打的而唤醒,然后抛出InterruptedException 异常。

    所以await方法的逻辑很简单,就是判断CountDownLatch中计数器的值算法为0,

    • 不为0就把线程放入等待队列
    • 为0,则线程继续然后执行

    await方法的变种方法await(long timeout, TimeUnit unit)

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
    protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    和await方法的逻辑类似,只是在*doAcquireSharedNanos(arg, nanosTimeout);*方法中做了超时唤醒的处理。这个方法是由AQS具体实现,故在分析AQS代码时在具体分析。

    即如果到达超时时间,CountDowmLatch中的计数器还没有归0,执行await的方法也照常唤醒

    countDown()方法

    public void countDown() {
            sync.releaseShared(1);
        }
    //抽象类AbstractQueuedSynchronizer的方法
    public final boolean releaseShared(int arg) {
    //判断tryReleaseShared返回,如果为真则通过doReleaseShared唤醒所以等待的线程且返回true
    //如果为假返回false不做任何处理
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    //CountDownLatch内部类Sync的方法
    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    //判断计数器(State)是否为0.因为如果计数器为0就不做等待操作
                    //直接返回,且如果为0,说明之前必定有一个线程执行了
                    //tryReleaseShared方法对计数器减一成功,故返回false
                    if (c == 0)
                        return false;
                    //通过CAS把c的值减一,再判断计数器是否为0,如果为0则返回true,唤醒
                    //所以等待线程。负责返回false不做任何处理
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    

    该方法的作用是对计数器进行减一,如果计数器的值变为0,则唤醒所有正在等待的线程。

    执行该方法后CountDownLatch中计数器的值最终只可能>=0。

    CountDownLatch类中其他方法没有分析的价值,故这里不做分析。

  • 相关阅读:
    java字节中的基本类型的职业的数目 (采访总是问)
    hdu 1814 Peaceful Commission (2-sat 输出字典序最小的路径)
    Ubuntu Server 14.04 LTS(64bit)已安装 weblogic Server 12c(12.1.3) Zip Distribution
    Tyvj P1015 公路骑 (DP)
    编程算法
    POJ 2502 Subway (Dijkstra 最短+建设规划)
    android_Activity生命周期功能
    ftk学习记录(脚本文章)
    2013年周二
    2013年第32周星期1
  • 原文地址:https://www.cnblogs.com/wf614/p/13168591.html
Copyright © 2011-2022 走看看