zoukankan      html  css  js  c++  java
  • Java CountDownLatch

    概述


    1.CountDownLath介绍

    2.CountDownLatch源码分析

    3.CountDownLatch示例

    CountDownLath介绍

    CountDownLatch是闭锁的一种实现,它允许一个或多个线程等待某一事件发生。CountDownLatch有一个正数计数器,countdown方法对计数器做减法操作,await方法等待计数器到底0则开始执行。所有await()方法都会等待计数器变为0;

    CountDownLatch的计数器是不可重置的,也就是说一次性的;

    应用场景,比如多个线程在跑数据前需要做一些准备工作,准备工作做好后,再开始执行。所有线程执行完毕后,再继续执行主线程,统计一下使用了多长时间。

    与CyclicBarrier的比较放在CyclicBarrier章节介绍。

    CountDownLatch源码分析

        public CountDownLatch(int count) {  //构造函数,初始化count值
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    Sync(int count) {
                setState(count);
            }

    await()方法其实调用的是acquireSharedInterruptibly

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)  //尝试获取共享锁,失败则执行下面方法
                doAcquireSharedInterruptibly(arg); 
        }
    protected int tryAcquireShared(int acquires) {  //获取共享锁,失败则返回-1
                return (getState() == 0) ? 1 : -1;
            }
    private void doAcquireSharedInterruptibly(int arg)  //这里面的很多方法在AQS锁章节详细介绍过
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);  //共享锁添加到AQS队列
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();  //自旋获取锁,如果非队头,则一直等待
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&  //判断是否需要阻塞线程
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)  
                    cancelAcquire(node);
            }
        }

    countdown()方法

        public void countDown() {
            sync.releaseShared(1);  //这次的参数是1,也就是说每次减1
        }
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {  //尝试释放共享锁,如果失败则调用doReleaseShared释放共享锁
                doReleaseShared();
                return true;
            }
            return false;
        }

    protected boolean tryReleaseShared(int releases) {  //每次减1
    // Decrement count; signal when transition to zero
    for (;;) {
    int c = getState(); 获取当前状态值
    if (c == 0) //如果为0则返回false
    return false;
    int nextc = c-1; //否则通过CAS将count减1
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }
     
     private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {  //判断status为SIGNAL
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);  //如果CAS设置status成功,则释放共享锁
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    CountDownLatch总结:

    CountDownLatch通过共享锁来实现。使用时会初始化一个int类型的计数器,当某个线程调用CountDownLatch的await()方法时,会等待共享锁可用时,才会获取共享锁继续运行,而共享锁的可用的条件是count值为0. 每个线程调用countdown()方法时,会将count减1,直到count为0时,await()等待的线程才会继续运行。

    CountDownLatch示例

    示例1,主线程等待5个子线程sleep 1s后,再继续执行主线程后面的代码。

    public class CountDownLatchTest1 extends Thread{
    
        public static CountDownLatch countDownLatch=new CountDownLatch(5);
        public static void main(String[] args){
    
            for(int i=0;i<5;i++){
    
                new CountDownLatchTest1().start();
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Finished");
    
        }
    
        public void run(){
    
            try {
    
                System.out.println(Thread.currentThread().getName()+" waiting");
                Thread.sleep(2000);
    
            } catch (Exception e) {
    
                e.printStackTrace();
            }finally {
    
                countDownLatch.countDown();
    
            }
    
        }
    }

    输出结果为:五个子线程均执行完后,再继续支持主线程的print语句。

    Thread-0 waiting
    Thread-2 waiting
    Thread-1 waiting
    Thread-3 waiting
    Thread-4 waiting
    Finished

    示例2,子线程执行前先等待准备工作,再同时执行,执行完毕后,主线程再继续执行

    public class CountDownLatchTest1 extends Thread{
    
        public static CountDownLatch countDownLatchStart=new CountDownLatch(1);  //增加了一个start latch
        public static CountDownLatch countDownLatchEnd=new CountDownLatch(5);
    
        public static void main(String[] args){
    
            for(int i=0;i<5;i++){
    
                new CountDownLatchTest1().start();
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Ready go...");
            countDownLatchStart.countDown();
            try {
                countDownLatchEnd.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Finished");
    
        }
    
        public void run(){
    
            try {
    
                countDownLatchStart.await();
                System.out.println(Thread.currentThread().getName()+" waiting");
                Thread.sleep(1000);
    
            } catch (Exception e) {
    
                e.printStackTrace();
            }finally {
    
                countDownLatchEnd.countDown();
    
            }
    
        }
    }

    输出结果为:

    Ready go...
    Thread-1 waiting
    Thread-3 waiting
    Thread-2 waiting
    Thread-0 waiting
    Thread-4 waiting
    Finished

  • 相关阅读:
    求子数组最大和
    layout_weight layout_width = 0dp
    一些日历的实现
    只显示年月日的日历
    每日学习之0512
    git 出现The current branch is not configured for pull No value for key branch.master.merge found in configuration错误的解决办法
    git的配置
    使用Spring security框架实现登陆页面时跳转到favicon.ico问题
    播放视频(c#)
    太阳沉落了
  • 原文地址:https://www.cnblogs.com/dpains/p/7521992.html
Copyright © 2011-2022 走看看