zoukankan      html  css  js  c++  java
  • Java并发(十四):并发工具类——CountDownLatch

    先做总结:

    1、CountDownLatch 是什么?

    CountDownLatch 允许一个或多个线程等待其他线程(不一定是线程,某个操作)完成之后再执行。

    CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

    当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。

    由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。

    2、实现原理:

    (1)CountDownLatch 的sync属性,继承自AQS

    (2)CountDownLatch countDownLatch = new CountDownLatch(5);时将countDownLatch.sync.state设置为5

    (3)countDownLatch.await(),检查sync.state!=0时,当前线程park(),放入sync的阻塞队列

    (4)countDownLatch.countDown(),sync.state - 1,如果发现sync.state==0了,唤醒sync阻塞队列中的线程

    3、CountDownLatch 与 CyclicBarrier区别:

    (1)CountDownLatch的计数器只能使用一次,而CyclicBarrier可以重复使用(可以重置)。

    (2)CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

    (3)CyclicBarrier针对的是线程,而CountDownLatch针对的是操作(只要调用countDownLatch.countDown()可以)。

    一、应用举例

    // 老板进入会议室等待5个人全部到达会议室才会开会。所以这里有两个线程老板等待开会线程、员工到达会议室:
    class CountDownLatchTest {
        private static CountDownLatch countDownLatch = new CountDownLatch(5);
    
        // Boss线程,等待员工到达开会
        static class BossThread extends Thread {
            @Override
            public void run() {
                System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
                try {
                    countDownLatch.await(); // Boss等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println("所有人都已经到齐了,开会吧...");
            }
        }
    
        // 员工到达会议室
        static class EmpleoyeeThread extends Thread {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + ",到达会议室...."); // 员工到达会议室 count - 1
                countDownLatch.countDown();
            }
        }
    
        public static void main(String[] args) {
            // Boss线程启动
            new BossThread().start();
    
            // 员工到达会议室
            for (int i = 0; i < countDownLatch.getCount(); i++) {
                new EmpleoyeeThread().start();
            }
        }
    }

    二、类结构

    public class CountDownLatch {
        private final Sync sync; //
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
            
            Sync(int count) {
                setState(count);
            }
            
            int getCount() {
                return getState();
            }
            
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
            
            protected boolean tryReleaseShared(int releases) {
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    }

    三、原理解析

    CountDownLatch countDownLatch = new CountDownLatch(5);

        public CountDownLatch(int count) {
            if (count < 0)
                throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * CountDownLatch.Sync.Sync(int)
         * AQS的state用作count计数
         */
        Sync(int count) {
            setState(count);
        }

    countDownLatch.await();

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        
        /**
         * AbstractQueuedSynchronizer.acquireSharedInterruptibly(int)
         * 尝试获取锁,获取不到锁,当前进入同步队列并挂起
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0) // 尝试获取锁
                doAcquireSharedInterruptibly(arg); // 获取不到锁,当前进入同步队列并挂起
        }
        
        /**
         * CountDownLatch.Sync.tryAcquireShared(int)
         * state/count没有减到0之前不予许拿锁
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

    countDownLatch.countDown();

        public void countDown() {
            sync.releaseShared(1);
        }
        
        /**
         * AbstractQueuedSynchronizer.releaseShared(int)
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) { // 尝试释放锁
                doReleaseShared(); // 释放掉锁之后,唤醒同步队列的线程(调用await()的线程)
                return true;
            }
            return false;
        }
        
        /**
         * CountDownLatch.Sync.tryReleaseShared(int)
         * countDown一次count/state减一
         * 直到count/state减到0,return true,允许释放同步队列里的线程
         */
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

    四、CyclicBarrier和CountDownLatch的区别

    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

    并发工具类(一)等待多线程完成的CountDownLatch

    【死磕Java并发】—–J.U.C之并发工具类:CountDownLatch

  • 相关阅读:
    【Elasticsearch 技术分享】—— ES 常用名词及结构
    【Elasticsearch 技术分享】—— Elasticsearch ?倒排索引?这都是什么?
    除了读写锁,JUC 下面还有个 StampedLock!还不过来了解一下么?
    小伙伴想写个 IDEA 插件么?这些 API 了解一下!
    部署Microsoft.ReportViewe
    关于TFS强制undo他人check out
    几段查看数据库表占用硬盘空间的tsql
    How to perform validation on sumbit only
    TFS 2012 Disable Multiple Check-out
    在Chrome Console中加载jQuery
  • 原文地址:https://www.cnblogs.com/hexinwei1/p/9983569.html
Copyright © 2011-2022 走看看