zoukankan      html  css  js  c++  java
  • CountDownLatch,CyclicBarrier,Semaphore

    CountDownLatch

    在开发过程中我们常常遇到需要对多个任务进行汇总,比如报表,或者大屏显示,需要将所有接口的数据都 获取到后再进行汇总,如果使用同步的方式,那么会比较耗时,体验不好,所以我们使用多线程,但是使用多线程 只能异步的执行,有些接口响应比较快,有些比较慢,而返回结果之间又有依赖,这样就无法汇总了, 所以我们引入了CountDownLatch,它能让所有子线程全部执行完毕后主线程才会往下执行,如果子线程没有执行完毕 ,那么主线程将无法继续向下执行。

    例子:我们需要对三个接口的返回结果进行求和。

        模拟三个接口
    
     public static Integer getOne(){
            return 1;
        }
        public static Integer getTwo(){
            return 2;
        }
        public static Integer getThree(){
            return 3;
        }

    我们创建一个线程池和CountDownLatch,CountDownLatch构造函数参数我们传3,表示计数器为3

       static ExecutorService executorService = Executors.newCachedThreadPool();
        /**
         * CountDownLatch(3) , 构造函数参数为3,
         */
        static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

    main函数,我们将三个任务加入线程池中,并且调用了countDownLatch.countDown(),调用此方法后计数器减1,因为一开始我们设置的 计数器为3,而三个线程执行后,每个-1,此时计算器变为0,这时候主线程的await才会返回,主线程才会向下执行,如果我们将计算器设置为 10,三个线程-3,此时计算器为7,那么await将会一直阻塞,主线程则无法向下执行,所以一定要让计算器为0后才会向下执行,

     public static void main(String[] args) throws InterruptedException, ExecutionException {
            Future<Integer> futureOne = executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName()+"  is over");
                Thread.sleep(2000);
                countDownLatch.countDown();
                return getOne();
            });
            Future<Integer> futureTwo = executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName()+"  is over");
                Thread.sleep(2000);
                //计数器-1
                countDownLatch.countDown();
                return getTwo();
            });
            Future<Integer> futureThree = executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName()+"  is over");
                Thread.sleep(2000);
                countDownLatch.countDown();
                System.out.println("count3 "+countDownLatch.getCount());
                return getThree();
            }); 
            //阻塞,等到计数器为0蔡往下执行
            countDownLatch.await();
            System.out.println("count  "+countDownLatch.getCount());
            System.out.println("child thread over , main thread start");
            Integer value1 = futureOne.get();
            Integer value2 = futureTwo.get();
            Integer value3 = futureThree.get();
            int total = value1 + value2 + value3;
            System.out.println("total  "+total);
        }

    CyclicBarrier

    CyclicBarrier和CountDownLatch很像,下面我们用它来实现CountDownLatch计数器功能, 代码如下,我们创建了一个CyclicBarrier,它的构造函数为parties和Runnable接口,parties表示计数器,Runnable表示parties 为0时执行的任务,我们再main函数中两个线程任务中执行后都进行了cyclicBarrier.await()操作,每进行一次,计数器parties值-1,两次 后parties为0,此时出发Runnable任务,

    package thread;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * TODO
     *
     * @author 刘牌
     * @version 1.0
     * @date 2021/9/4 0004 23:28
     */
    public class CyclicBarrierTest2 {
        static ExecutorService executorService = Executors.newCachedThreadPool();
        static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("task1 and task2 over , It's my turn");
            }
        });
    
        public static void main(String[] args) {
            executorService.submit(() -> {
                System.out.println("do task1");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
    
            executorService.submit(() -> {
                System.out.println("do task2");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    再举一个CountDownLatch无法实现的功能,执行有三个任务,每个任务都有三个阶段,需要每个任务的阶段同时执行,再到下一个阶段,阶段一执行后 到阶段2,阶段2完成后再到阶段3,code如下。

    package thread;
    
    import java.util.concurrent.*;
    
    /**
     * TODO
     *  来自《并发编程之美》例子
     * @author 刘牌
     * @version 1.0
     * @date 2021/9/4 0004 22:09
     */
    public class CyclicBarrierTest {
        static ExecutorService executorService = Executors.newCachedThreadPool();
        static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        public static void main(String[] args) {
            executorService.submit(() -> {
                System.out.println("task1");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task2");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task3");
            });
    
            executorService.submit(() -> {
                System.out.println("task1");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task2");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task3");
            });
    
            executorService.submit(() -> {
                System.out.println("task1");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task2");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("task3");
            });
        }
    }

    Semaphore

    Semaphore即信号量机制,它和CountDownLatch,CyclicBarrier类似,都是计数器的思想, 构造函数permits表示计数器,在每个线程任务执行完毕时我们调用了semaphore.release(),那么permits的值将会+1 因为我们执行了连个线程任务,所以此时permits为2,semaphore.acquire(2)处则满足条件,主线程将往下执行,如果改为 semaphore.acquire(3),那么主线程将会一直阻塞,因为计数器为2.

    package thread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    /**
     * TODO
     *信号量机制
     * @author 刘牌
     * @version 1.0
     * @date 2021/9/4 0004 21:36
     */
    public class SemaphoreTest {
    
        static Semaphore semaphore = new Semaphore(0);
        static ExecutorService executorService = Executors.newCachedThreadPool();
    
        public static void main(String[] args) throws InterruptedException {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName()+"  is over");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.release();
            });
    
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName()+"  is over");
                try {
                    Thread.sleep(7000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                semaphore.release();
            });
            semaphore.acquire(2);
            System.out.println("all child thread is over,main thread is start");
            executorService.shutdown();
        }
    }
     
    生命不止,折腾不息
  • 相关阅读:
    linux内存和swap
    Linux awk sort
    redis aof和rdb区别
    STL中的map、unordered_map、hash_map
    mysql 冷热备份
    redis
    linux 几个命令
    linux erase
    group by
    现在很多技术知识点缺乏来龙去脉的介绍
  • 原文地址:https://www.cnblogs.com/steakliu/p/15228422.html
Copyright © 2011-2022 走看看