zoukankan      html  css  js  c++  java
  • CountDownLatch原理分析

    本文承接上一篇文章AQS-共享模式分析

    概述

      CountDownLatch是一个同步计数器,他允许一个或者多个线程在另外一组线程执行完成之前一直等待,基于AQS共享模式实现的,下面就先举一个简单例子,从例子入手分析CountDownLatch的原理。

    例子

    public class CountDownLatchTest {
    
        public static void main(String[] args) {
            final CountDownLatch latch = new CountDownLatch(2);
            System.out.println("主线程开始执行…… ……");
            //第一个子线程执行
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            es1.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.out.println("子线程:"+Thread.currentThread().getName()+"执行");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
            es1.shutdown();
    
            //第二个子线程执行
            ExecutorService es2 = Executors.newSingleThreadExecutor();
            es2.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("子线程:"+Thread.currentThread().getName()+"执行");
                    latch.countDown();
                }
            });
            es2.shutdown();
            System.out.println("等待两个线程执行完毕…… ……");
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("两个子线程都执行完毕,继续执行主线程");
        }
    }

    上面例子中,首先new CountDownLatch,给了同步计数器一个计数值,然后新建了两个线程,每当一个线程执行完之前调用一下countDown()方法,将计数减1,在主线程中执行了await方法,该方法在计数器值大于0之前一直等待,直到计数器为0,结束等待,下面就分析一下CountDownLatch原理。

    CountDownLatch类结构

        

     从图中可以看出CountDownLatch是基于Sync抽象类实现的,而Sync继承AQS,使用的是AQS共享模式。

    构造方法如下:

    //需要传入计数器的大小   
     public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }

    这里需要注意,设置state的数量只有在初始化CountDownLatch的时候,如果该state被减成了0,就无法继续使用这个CountDownLatch了,需要重新new一个,这就是这个类不可重用的原因,有另一个类也实现了类似的功能,但是可以重用,就是CyclicBarrier,后面会介绍。

    Sync类分析

    private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
         //初始化,设置资源个数
            Sync(int count) {
                setState(count);
            }
             //获取共享资源个数
            int getCount() {
                return getState();
            } 
            //尝试获取共享锁,只有当共享资源个数为0的时候,才会返回1,否则为-1
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
            //释放共享资源,通过CAS每次对state减1
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }

    CountDownLatch主要方法分析

    await()方法

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

    进入AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法.

     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //等待过程不可中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //这里的tryAcquireShared在AbstractQueuedSynchronizer中没有实现,在上面介绍的Sync中实现的
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

    在上面介绍Sync类的时候#tryAcquireShared(),当AQS的state = 0的时候才会返回1,否则一直返回-1,如果返回-1,要执行#doAcquireSharedInterruptibly(),进入该方法

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //这里就把主线程加入队列,队列中有两个节点,第一个是虚拟节点,第二个就是主线程节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    //总共只有两个节点,主线程前一个就是首节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        //这里又执行到CountDownLatch中Sync类中实现的方法,判断state是否为0
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //如果state不为0,这里会把主线程挂起阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    这里使用AQS很神奇,在阻塞队列中就只加入了一个主线程,但是呢,只要其他线程没有执行完,那state就不为0,那主线程就在这里阻塞着,那问题了,谁来唤醒这个主线程呢?就是下面要介绍的方法。

    countDown()方法

       public void countDown() {
            sync.releaseShared(1);
        }

    进入AbstractQueuedSynchronizer #releaseShared方法

        public final boolean releaseShared(int arg) {
            //该方法同样在AbstractQueuedSynchronizer中没有实现,在CountDownLatch中实现
            if (tryReleaseShared(arg)) {
                //唤醒主线程
                doReleaseShared();
                return true;
            }
            return false;
        }

    在分析Sync类的时候,介绍了tryReleaseShared(),该方法会把AQS的state减1,如果减1操作成功,执行唤醒主线程操作,进入AbstractQueuedSynchronizer#tryReleaseShared()方法

     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    //首节点状态为SIGNAL = -1
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                   
                        //唤醒主线程,也就是队列中的第二个节点,如果线程没有执行完成,主线程被唤醒之后,发现state依然不为零,会再次阻塞
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }

    await(long timeout, TimeUnit unit)方法

    该方法就是指定等待时间,如果在规定的等待时间中没有完成,就直接返回false,在主线程中可以根据这个状态进行后续的处理。

    总结

    CountDownLatch这个类对AQS的使用很神奇,像之前介绍的ReentrantLock和Semaphore都会在阻塞队列中放入很多的线程,而CountDownLatch就只在队列中放入一个主线程,然后不停的唤醒,唤醒之后发现state还是不为0,就继续等待。每个子线程执行完都会对state进行减1操作,当所有子线程都执行完了,那state也就为0,这时候主线程被唤醒之后才可以继续执行。而这也正是CountDownLatch不可重用的原因,如果想要重用,需要重新new一个,因为只有在new的时候才可以设置资源的数量。

          

    参考文章:

    countDownLatch

  • 相关阅读:
    10.2 处理大集合
    观察者模式——出版者与订阅者
    phonegap(cordova) 自己定义插件代码篇(五)----android ,iOS 集成微信登陆
    另一鲜为人知的单例写法-ThreadLocal
    Spring MVC中Controller如何将数据返回给页面
    IntelliJ IDEA安装主题详细步骤
    oracle导出dmp文件的2种方法
    Oracle导出表(即DMP文件)的两种方法
    Spring MVC 实现文件的上传和下载
    压力测试 JMeter3.3
  • 原文地址:https://www.cnblogs.com/gunduzi/p/13616806.html
Copyright © 2011-2022 走看看