zoukankan      html  css  js  c++  java
  • CountDownLatch与CyclicBarrier的基本使用

    1 概述

    CountDownLatch以及CyclicBarrier都是Java里面的同步工具之一,本文介绍了两者的基本原理以及基本使用方法。

    2 CountDownLatch

    CountDownLatch是一个同步工具类,常见的使用场景包括:

    • 允许一个或多个线程等待一系列的其他线程结束
    • 在串行化任务中需要进行并行化处理,并等待所有并行化任务结束,串行化任务才能继续进行

    比如考虑这样一个场景,在一个电商网站中,用户点击了首页,需要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:

    • 获取商品
    • 计算售价
    • 返回所有商品的最终售价

    解决这样的问题可以使用串行化或并行化操作,串行化就是逐一计算商品的售价,并返回,并行化就是获取商品后,并行计算每一个商品的售价,最后返回,显然后一种方案要比前一种要好,那么这时候就可以用上CountDownLatch了。

    一份简单的模拟代码如下:

    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    import static java.util.concurrent.ThreadLocalRandom.current;
    
    public class CountDownLatchExample {
        public static void main(String[] args) throws InterruptedException{
            List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
            //计数器大小为商品列表的长度
            final CountDownLatch latch = new CountDownLatch(list.size());
            //线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
            list.forEach(p-> executor.execute(()->{
                System.out.println("Product "+p.id+" start calculate price ");
                try{
                	//随机休眠模拟业务操作耗时
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                    System.out.println("Product "+p.id+" calculate price completed");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                	//每完成计算一个商品,将计数器减1,注意需要放在finally中
                    latch.countDown();
                }
            }));
            //主线程阻塞直到所有的计数器为0,也就是等待所有的子任务计算价格完毕
            latch.await();
            System.out.println("All of prices calculate finished");
            //手动终止,不然不会结束运行
            executor.shutdown();
        }
    
        private static class Price{
            private final int id;
            private double price;
    
            public Price(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
    
            public double getPrice() {
                return price;
            }
    
            public void setPrice(double price) {
                this.price = price;
            }
        }
    }
    

    输出:

    在这里插入图片描述

    代码比较简单,关键地方用上了注释,可以看到代码执行顺序如下:

    • 创建多个任务计算商品的价格
    • 主线程阻塞
    • 计算完成后,将计数器减1
    • 当计数器为0时,主线程退出阻塞状态

    值得注意的是计数器减1的操作需要放在finally中,因为有可能会出现异常,如果出现异常导致计数器不能减少,那么主线程会一直阻塞。

    另外,CountDownLatch还有一个await(long timeout,TimeUnit unit)方法,是带有超时参数的,也就是说,如果在超时时间内,计数器的值还是大于0(还有任务没执行完成),会使得当前线程退出阻塞状态。

    3 CyclicBarrier

    CyclicBarrierCountDownLatch有很多类似的地方,也是一个同步工具类,允许多个线程在执行完相应的操作之后彼此等待到达同一个barrier point(屏障点)。CyclicBarrier也适合某个串行化的任务被拆分为多个并行化任务,这点与CountDownLatch类似,但是CyclicBarrier具备的一个更强大的功能是,CyclicBarrier可以被重复使用。

    3.1 等待完成

    先简单说一下CyclicBarrier的实现原理:

    • 初始化CyclicBarrier,传入一个int参数,表示分片(parites),通常意义上来说分片数就是任务的数量
    • 同时串行化执行多个任务
    • 任务执行完成后,调用await(),等待其他线程也到达barrier point
    • 当所有线程到达后,继续以串行化方式运行任务

    常见的使用方法是设置分片数为任务数+1,这样,可以在主线程中执行await(),等待所有子任务完成。比如下面是使用CyclicBarrier实现同样功能的模拟代码:

    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    import static java.util.concurrent.ThreadLocalRandom.current;
    
    public class CountDownLatchExample {
        public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
            List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
            final CyclicBarrier barrier = new CyclicBarrier(11);
            ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
            list.forEach(p-> executor.execute(()->{
                System.out.println("Product "+p.id+" start calculate price ");
                try{
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                    System.out.println("Product "+p.id+" calculate price completed");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    try{
                        barrier.await();
                    }catch (InterruptedException | BrokenBarrierException e){
                        e.printStackTrace();
                    }
                }
            }));
            barrier.await();
            System.out.println("All of prices calculate finished");
            executor.shutdown();
        }
    
        private static class Price{
            private final int id;
            private double price;
    
            public Price(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
    
            public double getPrice() {
                return price;
            }
    
            public void setPrice(double price) {
                this.price = price;
            }
        }
    }
    

    输出相同,代码大部分相似,不同的地方有:

    • latch.countDown()替换成了barrier.await()
    • latch.await()替换成了barrier.await()
    • 线程池的核心线程数替换成了10

    await()方法会等待所有的线程到达barrier point,上面代码执行流程简述如下:

    • 初始化CyclicBarrier,分片数为11(子线程数+1)
    • 主线程调用await(),等待子线程执行完成
    • 子线程各自进行商品价格的计算,计算完成后,调用await(),等待其他线程也到达barrier point
    • 当所有子线程计算完成后,由于没有后续操作,所以子线程运行结束,同时由于主线程还有后续操作,会先输出提示信息再终止线程池

    注意一个很大的不同就是这里的线程池核心线程数目改成了 10,那么,为什么需要10?

    因为如果是设置一个小于10的核心线程个数,由于线程池是会先创建核心线程来执行任务,核心线程满了之后,放进任务队列中,而假设只有5个核心线程,那么:

    • 5个线程进行计算价格
    • 另外5个任务放在任务队列中

    这样的话,会出现死锁,因为计算中的线程需要队列中的任务到达barrier point才能结束,而队列中的任务需要核心线程计算完毕后,才能调度出来计算,这样死锁就出现了。

    3.2 重复使用

    CyclicBarrierCountDownLatch的一个最大不同是,CyclicBarrier可以被重复使用,原理上来说,await()会将内部计数器减1,当计数器减为0时,会自动进行计数器(分片数)重置。比如,在上面的代码中,由于遇上促销活动,需要对商品的价格再次进行计算:

    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    import static java.util.concurrent.ThreadLocalRandom.current;
    
    public class CountDownLatchExample {
        public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
            List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
            final CyclicBarrier barrier = new CyclicBarrier(11);
            ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
            list.forEach(p-> executor.execute(()->{
                System.out.println("Product "+p.id+" start calculate price.");
                try{
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                    System.out.println("Product "+p.id+" calculate price completed.");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    try{
                        barrier.await();
                    }catch (InterruptedException | BrokenBarrierException e){
                        e.printStackTrace();
                    }
                }
            }));
            barrier.await();
            System.out.println("All of prices calculate finished.");
    		
    		//复制的一段相同代码
            list.forEach(p-> executor.execute(()->{
                System.out.println("Product "+p.id+" start calculate price again.");
                try{
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                    System.out.println("Product "+p.id+" calculate price completed.");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    try{
                        barrier.await();
                    }catch (InterruptedException | BrokenBarrierException e){
                        e.printStackTrace();
                    }
                }
            }));
            barrier.await();
            System.out.println("All of prices calculate finished again.");
            executor.shutdown();
        }
    
        private static class Price{
            private final int id;
            private double price;
    
            public Price(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
    
            public double getPrice() {
                return price;
            }
    
            public void setPrice(double price) {
                this.price = price;
            }
        }
    }
    

    将计算价格的代码复制一遍,其中没有手动修改计数器,只是调用await(),输出如下:

    在这里插入图片描述

    可以看到,并没有对CycliBarrier进行类似reset之类的操作,但是依然能按正常逻辑运行,这是因为await()内部会维护一个计数器,当计数器为0的时候,会自动进行重置,下面是await()OpenJDK 11下的源码:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return this.dowait(false, 0L);
        } catch (TimeoutException var2) {
            throw new Error(var2);
        }
    }
        
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        ReentrantLock lock = this.lock;
        lock.lock();
    
        byte var9;
        try {
            //...
            int index = --this.count;
            if (index != 0) {
                //计数器不为0的情况
                //....
            }
    
            boolean ranAction = false;
    
            try {
                Runnable command = this.barrierCommand;
                if (command != null) {
                    command.run();
                }
    
                ranAction = true;
                
                this.nextGeneration();
                var9 = 0;
            } finally {
                if (!ranAction) {
                    this.breakBarrier();
                }
    
            }
        } finally {
            lock.unlock();
        }
    
        return var9;
    }
    
    private void nextGeneration() {
        this.trip.signalAll();
        this.count = this.parties;
        this.generation = new CyclicBarrier.Generation();
    }
    

    当计数器为0时,会生成新的Generation,并将var9置为0,最后返回var9(在这个方法中var9只有一处赋值,就是代码中的var9=0,可以理解成直接返回0)。

    3.3 CyclicBarrier其他的一些常用方法

    • CyclicBarrier(int parties,Runnable barrierAction):构造的时候传入一个Runnable,表示所有线程到达barrier point时,会调用该Runnable
    • await(long timeout,TimeUnit unit):与无参的await()类似,底层调用的是相同的doWait(),不过增加了超时功能
    • isBroken():返回broken状态,某个线程由于执行await而进入阻塞,此时如果执行了中断操作(比如interrupt),那么isBroken()会返回true。需要注意,处于broken状态的CyclicBarrier不能被直接使用,需要调用reset()进行重置

    4 总结

    下面是CountDownLatchCyclicBarrier的一些简单比较,相同点如下:

    • 都是java.util.concurrent包下的线程同步工具类
    • 都可以用于“主线程阻塞一直等待,直到子任务完成,主线程才继续执行”的情况

    不同点:

    • CountDownLatchawait()方法会等待计数器归0,而CyclicBarrierawait()会等待其他线程到达barrier point
    • CyclicBarrier内部的计数器是可以被重置的,但是CountDownLatch不可以
    • CyclicBarrier是由LockCondition实现的,而CountDownLatch是由同步控制器AQS实现的
    • 构造时CyclicBarrier不允许parties为0,而CountDownLatch允许count为0

    如果觉得文章好看,欢迎点赞。

    同时欢迎关注微信公众号:氷泠之路。

  • 相关阅读:
    element ui el-date-picker 判断所选时间是否交叉
    MDN中的箭头函数!!!
    es6 解构
    element ui 实现可编辑表格
    节流 防抖。。。。深入理解
    element ui 表格对齐方式,表头对齐方式
    纯html + css 导航栏
    PHP 1
    apache 建立虚拟主机
    Can't connect to local MySQL server through socket '/tmp/mysql.sock'
  • 原文地址:https://www.cnblogs.com/6b7b5fc3/p/14684697.html
Copyright © 2011-2022 走看看