zoukankan      html  css  js  c++  java
  • 最常用的CountDownLatch, CyclicBarrier你知道多少? (Java工程师必会)

    CountdownLatch,CyclicBarrier是非常常用并发工具类,可以说是Java工程师必会技能了。不但在项目实战中经常涉及,而且在编写压测程序,多线程demo也是必不可少,所以掌握它们的用法和实现原理非常有必要。

    念念不忘,必有回响!
    点赞走一走,找到女朋友~

    等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。也就是说通过使用CountDownLatch工具类,可以让一组线程等待彼此执行完毕后在共同执行下一个操作。具体流程如下图所示,箭头表示任务,矩形表示栅栏,当三个任务都到达栅栏时,栅栏后wait的任务才开始执行。

    CountDownLatch维护有个int型的状态码,每次调用countDown时状态值就会减1;调用wait方法的线程会阻塞,直到状态码为0时才会继续执行。

    在多线程协同工作时,可能需要等待其他线程执行完毕之后,主线程才接着往下执行。首先我们可能会想到使用线程的join方法(调用join方法的线程优先执行,该线程执行完毕后才会执行其他线程),显然这是可以完成的。

    使用Thread.join()方法实现

    public class RunningRaceTest {
        public static void main(String[] args) throws InterruptedException {
            Thread runner1 = new Thread(new Runner(), "1号");
            Thread runner2 = new Thread(new Runner(), "2号");
            Thread runner3 = new Thread(new Runner(), "3号");
            Thread runner4 = new Thread(new Runner(), "4号");
            Thread runner5 = new Thread(new Runner(), "5号");
            runner1.start();
            runner2.start();
            runner3.start();
            runner4.start();
            runner5.start();
    
            runner1.join();
            runner2.join();
            runner3.join();
            runner4.join();
            runner5.join();
    
            // 裁判等待5名选手准备完毕
            System.out.println("裁判:比赛开始~~");
        }
    }
    
    class Runner implements Runnable {
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    Thread.join()完全可以实现这个需求,不过存在一个问题,如果调用join的线程一直存活,则当前线程则需要一直等待。这显然不够灵活,并且当前线程可能会出现死等的情况。

    更加灵活的CountDownLatch

    jdk1.5之后的并发包中提供了CountDownLatch并发工具了,也可以实现join的功能,并且功能更加强大。

    // 参赛选手线程
    class Runner implements Runnable {
        private CountDownLatch countdownLatch;
        
        public Runner(CountDownLatch countdownLatch) {
            this.countdownLatch = countdownLatch;
        }
    
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 准备完毕,举手示意
                countdownLatch.countDown();
            }
        }
    }
    
    public class RunningRaceTest {
        public static void main(String[] args) throws InterruptedException {
            // 使用线程池的正确姿势
            int size = 5;
            AtomicInteger counter = new AtomicInteger();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());
            
            CountDownLatch countDownLatch = new CountDownLatch(5);
            for (int i = 0; i < size; i++) {
                threadPoolExecutor.submit(new Runner(countDownLatch));
            }
    
            // 裁判等待5名选手准备完毕
            countDownLatch.await(); // 为了避免死等,也可以添加超时时间
            System.out.println("裁判:比赛开始~~");
    
            threadPoolExecutor.shutdownNow();
        }
    }
    

    输出结果:

    5 号  选手已就位, 准备共用时: 20ms
    4 号  选手已就位, 准备共用时: 156ms
    1 号  选手已就位, 准备共用时: 288ms
    2 号  选手已就位, 准备共用时: 519ms
    3 号  选手已就位, 准备共用时: 945ms
    比赛开始~~
    

    同步屏障CyclicBarrier

    CyclicBarrier可以实现CountDownLatch一样的功能,不同的是CountDownLatch属于一次性对象,声明后只能使用一次,而CyclicBarrier可以循环使用

    从字面意义上来看,CyclicBarrier表示循环的屏障,当一组线程全部都到达屏障时,屏障才会被移除,否则只能阻塞在屏障处。

    public class RunningRace {
        public static void main(String[] args) {
            // 使用线程池的正确姿势
            int size = 5;
            AtomicInteger counter = new AtomicInteger();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy());
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~"));
            for (int i = 0; i < 10; i++) {
                threadPoolExecutor.submit(new Runner(cyclicBarrier));
            }
        }
    }
    
    class Runner implements Runnable {
        private CyclicBarrier cyclicBarrier;
    
        public Runner(CyclicBarrier countdownLatch) {
            this.cyclicBarrier = countdownLatch;
        }
    
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    

    由于CyclicBarrier可以循环使用,所以CyclicBarrier的构造方法中可以传入一个Runnable参数,在每一轮执行完毕之后就会立刻执行这个Runnable任务

    CountDownLatch设计与实现

    CountDownLath是基于AQS框架的一种简单实现,有两个核心的方法,即await()和countDown(),通过构造方法传入一个状态值,调用await()方法时线程会阻塞,直到状态码被修改成0时才会返回,每次调用countDown()时会将状态值减1。

    wait方法:执行wait方法后,会尝试获取同步状态,如果为状态为0则方法继续执行,否择当前线程会被加入到同步队列中,详情可见笔者关于AQS的两篇文章。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果状态码不为0,尝试获取同步状态,如果失败则被加入到同步队列中
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 当状态码为0时返回1,否择返回-1,这个方法中参数没有任何用处
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    

    countDown方法:每次执行countDown方法时,会将状态码的值减1.

    public void countDown() {
        sync.releaseShared(1);
    }
    

    CyclicBarrier的设计与实现

    CyclicBarrier与CountDownLatch实现思想相同,也是基于AQS框架实现。不同的是CyclicBarrier内部维护一个状态值借助基于AQS实现的锁ReentrantLock来实现状态值的同步更新,以及AQS除了同步状态之外的另一个核心概念条件队列来完成线程的阻塞。

    parties: 和CountdownLatch中的状态值一样,用来记录每次要相互等待的线程数量,只有parties个线程同时到达屏障时,才会唤醒阻塞的线程。

    count临时计数器: 由于CyclicBarrier是可以循环使用的,count可以理解为是一个临时变量,每一轮执行完毕或者被打断都会重置count为parties值。

    Generation内部类: 只有一个属性 broken表示当前这一轮执行是否被中断,如果被中断后其他线程再执行await方法会抛出异常(目的是停止本轮线程未执行线程的继续执行)。

    await方法: 当执行await方法时,会同步得对内部的count执行--count操作, 如果count = 0,则执行barrierCommand任务(通过构造方法传来的Runnable参数)。

    reset方法:中断本轮执行,重置count值,唤醒等待的线程然后开始下一轮,此时本轮正在执行的线程调用await方法会抛出异常。

    // await方法实际执行的代码
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        // 加锁,保证并发操作的一致性
        lock.lock();
        try {
            // 如果当前这一轮操作被中断,抛出中断异常(该异常只是起警示作用,没有任何其他信息)
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    	    // 本轮执行的计数器 数值-1
            int index = --count;
            if (index == 0) {  // 计数器值=1, 本轮线程全部到达屏障,执行barrierCommand任务
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();// 唤醒所有等待在条件队列上的任务
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // 如果状态不等于0,循环等待直到计数器值为0,本轮执行被打破,线程被中断,或者等待超时
            for (;;) {
                try {
                    if (!timed)
                        // 状态码不为0,将当前线程加入到条件队列中,进入阻塞状态
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();// 唤醒所有条件队列中的线程,重置count的值
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    

    重置栅栏的状态

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    当一轮执行完毕之后,既count=0后,CyclicBarrier的临时状态会重置为parties

    /**
     * 进入下一轮
     * 唤醒所有等待线程,充值count
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
    

    总结

    1. CountDownLatch创建后只能使用一次,而CyclicBarrier可以循环使用,并且CyclicBarrier功能更完善。
    2. CountDownLatch内部的状态是基于AQS中的状态信息,而CyclicBarrier中的状态值是单独维护的,使用ReentrantLock加锁保证并发修改状态值的数据一致性。
    3. 它们的使用场景:允许一个或多个线程等待其他线程完成操作, 即当指定数量线程执行完某个操作再继续执行下一个操作。

  • 相关阅读:
    洛谷P3953 逛公园
    洛谷P1247 取火柴游戏
    洛谷P2024 食物链
    洛谷P2680 运输计划
    hdu 1495 非常可乐(bfs)
    poj3984 迷宫问题(简单的输出路径的bfs)
    Codeforces 729C Road to Cinema(二分)
    Codeforces Technocup 2017
    Codeforces Technocup 2017
    poj 2251 Dungeon Master(bfs)
  • 原文地址:https://www.cnblogs.com/liqiangchn/p/12105295.html
Copyright © 2011-2022 走看看