zoukankan      html  css  js  c++  java
  • CountDownLatch、CyclicBarrier、Semaphore共同之处与区别以及各自使用场景

    区别

    • CountDownLatch 使一个线程A或是组线程A等待其它线程执行完毕后,一个线程A或是组线程A才继续执行。CyclicBarrier:一组线程使用await()指定barrier,所有线程都到达各自的barrier后,再同时执行各自barrier下面的代码。Semaphore:是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
    • CountDownLatch是减计数方式,计数==0时释放所有等待的线程;CyclicBarrier是加计数方式,计数达到构造方法中参数指定的值时释放所有等待的线程。Semaphore,每次semaphore.acquire(),获取一个资源,每次semaphore.acquire(n),获取n个资源,当达到semaphore 指定资源数量时就不能再访问线程处于阻塞,必须等其它线程释放资源,semaphore.relase()每次资源一个资源,semaphore.relase(n)每次资源n个资源。
    • CountDownLatch当计数到0时,计数无法被重置;CyclicBarrier计数达到指定值时,计数置为0重新开始。
    • CountDownLatch每次调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响;CyclicBarrier只有一个await()方法,调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞。
    • CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。CountDownLatch、CyclikBarrier这个值作为计数用,达到该次数即释放等待的线程,而Semaphore 中所有acquire获取到的资源达到这个数,会使得其它线程阻塞。

    共同

    • CountDownLatch与CyclikBarrier两者的共同点是都具有await()方法,并且执行此方法会引起线程的阻塞,达到某种条件才能继续执行(这种条件也是两者的不同)。Semaphore,acquire方获取的资源达到最大数量时,线程再次acquire获取资源时,也会使线程处于阻塞状态。CountDownLatch与CyclikBarrier两者的共同点是都具有await()方法,并且执行此方法会引起线程的阻塞,达到某种条件才能继续执行(这种条件也是两者的不同)。Semaphore,acquire方获取的资源达到最大数量时,线程再次acquire获取资源时,也会使线程处于阻塞状态。CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。
    • CountDownLatch、CyclikBarrier、Semaphore 都有一个int类型参数的构造方法。
     

    CountDownLatch、CyclicBarrier和Semaphore使用场景

    CountDownLatch

    由于CountDownLatch有个countDown()方法并且countDown()不会引起阻塞,所以CountDownLatch可以应用于主线程等待所有子线程结束后再继续执行的情况。具体使用方式为new一个构造参数为subThread数目的CountDownLatch,启动所有子线程后主线程await(),在每个子线程的最后执行countDown(),这样当所有子线程执行完后计数减为0,主线程释放等待继续执行。比如赛跑,每个运动员看做一个子线程,裁判就是主线程,裁判发令(设置一个值为1的计数器,发令之前所有子线程await等待命令,裁判员发令让计数置为0,所有子线程同时开跑)所有运动员开跑后,需要等待所有人跑完再统计成绩(设置一个值为运动员数目的计数器,所有运动员开跑后裁判await被阻塞,每个运动员跑完的时候countDown()一下,所有运动员跑完计数达到0,裁判释放阻塞开始计分)。
    public class CountDownLatchCase {  
          
        public static void main(String args[]){  
            //主线程为裁判,子线程为运动员  
            final CountDownLatch operatorNum=new CountDownLatch(6);  
            for(int i=0;i<6;i++){  
                new Thread(new Runnable() {  
                    @Override  
                    public void run() {  
                        Long s=System.currentTimeMillis();  
                        Random random=new Random();  
                        try {  
                            Thread.sleep(random.nextInt(10000));  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                        Long time=System.currentTimeMillis()-s;  
                        System.out.println(String.format(”运动员%s跑完,花了%s”, Thread.currentThread().getName(),time));  
                        //一个运动员跑完了  
                        operatorNum.countDown();  
                    }  
                }).start();  
            }  
            try {  
                operatorNum.await();  
                System.out.println(String.format(”所有运行员都跑完了,裁判%s可宣布结果啦!”, Thread.currentThread().getName()));  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
              
        }  
    }  

    CyclicBarrier

    由于CyclicBarrier计数达到指定后会重新循环使用,所以CyclicBarrier可以用在所有子线程之间互相等待多次的情形。比如在某种需求中,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择CyclicBarrier了。 比如团队旅游,一个团队通常分为几组,每组人走的路线可能不同,但都需要到达某一地点等待团队其它成员到达后才能进行下一站。
    /**   
     * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。   
     *    
     * @author guangbo email:weigbo@163.com   
     *    
     */    
    public class Total {     
        
        // private ConcurrentHashMap result = new ConcurrentHashMap();     
        
        public static void main(String[] args) {     
            TotalService totalService = new TotalServiceImpl();     
            CyclicBarrier barrier = new CyclicBarrier(5,     
                    new TotalTask(totalService));     
        
            // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。     
            new BillTask(new BillServiceImpl(), barrier, “北京”).start();     
            new BillTask(new BillServiceImpl(), barrier, “上海”).start();     
            new BillTask(new BillServiceImpl(), barrier, “广西”).start();     
            new BillTask(new BillServiceImpl(), barrier, “四川”).start();     
            new BillTask(new BillServiceImpl(), barrier, “黑龙江”).start();     
        
        }     
    }     
        
    /**   
     * 主任务:汇总任务   
     */    
    class TotalTask implements Runnable {     
        private TotalService totalService;     
        
        TotalTask(TotalService totalService) {     
            this.totalService = totalService;     
        }     
        
        public void run() {     
            // 读取内存中各省的数据汇总,过程略。     
            totalService.count();     
            System.out.println(”=======================================”);     
            System.out.println(”开始全国汇总”);     
        }     
    }     
        
    /**   
     * 子任务:计费任务   
     */    
    class BillTask extends Thread {     
        // 计费服务     
        private BillService billService;     
        private CyclicBarrier barrier;     
        // 代码,按省代码分类,各省数据库独立。     
        private String code;     
        
        BillTask(BillService billService, CyclicBarrier barrier, String code) {     
            this.billService = billService;     
            this.barrier = barrier;     
            this.code = code;     
        }     
        
        public void run() {     
            System.out.println(”开始计算–” + code + “省–数据!”);     
            billService.bill(code);     
            // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略     
            System.out.println(code + ”省已经计算完成,并通知汇总Service!”);     
            try {     
                // 通知barrier已经完成     
                barrier.await();     
            } catch (InterruptedException e) {     
                e.printStackTrace();     
            } catch (BrokenBarrierException e) {     
                e.printStackTrace();     
            }     
        }     
        
    }   

    Semaphore

    Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控,代码如下:
    public class SemaphoreCase {  
      
        private static final int THREAD_COUNT = 30;  
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);  
        private static Semaphore s = new Semaphore(10);  
        public static void main(String[] args) {  
            for (int i = 0; i < THREAD_COUNT; i++) {  
                threadPool.execute(new Runnable() {  
                    @Override  
                    public void run() {  
                        try {  
                            s.acquire();  
                            System.out.println(”save data”);  
                            s.release();  
                        } catch (InterruptedException e) {  
                        }  
                    }  
                });  
            }  
            threadPool.shutdown();  
        }  
      
    }  
    注意:
    • release函数和acquire并没有要求一定是同一个线程都调用,可以A线程申请资源,B线程释放资源;
    • 调用release函数之前并没有要求一定要先调用acquire函数。
  • 相关阅读:
    [Javascript] Broadcaster + Operator + Listener pattern -- 3 Stop with condition
    分布式事务科普(初识篇)
    分布式事务不理解?一次给你讲清楚!
    分布式事务,有解吗?
    分布式事务精华总结篇,实打实的干货!
    常用的分布式事务解决方案介绍有多少种?
    5种分布式事务解决方案优缺点对比
    Leaf——美团点评分布式ID生成系统
    MySQL分区总结
    互联网公司为啥基本不使用mysql分区表
  • 原文地址:https://www.cnblogs.com/zhaoyan001/p/10775676.html
Copyright © 2011-2022 走看看