zoukankan      html  css  js  c++  java
  • Java并发编程的艺术笔记(七)——CountDownLatch、CyclicBarrier详解

    一.等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作,像加强版的join。(t.join()是等待t线程完成)

    例:

    1)开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段;解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个          线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。

    2)应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

    3)确保一个计算不会执行,直到所需要的资源被初始化。

    CountDownLatch典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

     例:

    package concurrent;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     *@author 梦里南轲
     *
     *类说明:演示CountDownLatch,有5个初始化的线程,6个扣除点,
     *初始化线程4个,每个1步
     *单独的初始化线程1个,2步
     *扣除完毕以后,主线程和业务线程才能继续自己的工作
     */
    public class UseCountDownLatch {
        
        static CountDownLatch latch = new CountDownLatch(6);
    
        //初始化线程(只有一步,有4个)
        private static class InitThread implements Runnable{
    
            @Override
            public void run() {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work......");
                latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
                for(int i =0;i<2;i++) {
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +" ........continue do its work");
                }
            }
        }
        
        //业务线程
        private static class BusiThread implements Runnable{
    
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(int i =0;i<3;i++) {
                    System.out.println("BusiThread_"+Thread.currentThread().getId()
                            +" do business-----");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            // 单独的初始化线程,初始化分为2步,需要扣减两次
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 1st......");
                        latch.countDown();// 每完成一步初始化工作,扣减一次
                        System.out.println("begin step 2nd.......");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 2nd......");
                        latch.countDown();// 每完成一步初始化工作,扣减一次
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new BusiThread()).start();
            for (int i = 0; i <= 3; i++) {
                Thread thread = new Thread(new InitThread());
                thread.start();
            }
    
            latch.await();
            System.out.println("Main do ites work........");
        }
    }

    运行结果:

    Thread_12 ready init work......
    Thread_12 ........continue do its work
    Thread_12 ........continue do its work
    Thread_14 ready init work......
    Thread_14 ........continue do its work
    Thread_14 ........continue do its work
    Thread_13 ready init work......
    Thread_15 ready init work......
    Thread_15 ........continue do its work
    Thread_15 ........continue do its work
    Thread_13 ........continue do its work
    Thread_13 ........continue do its work
    Thread_10 ready init work step 1st......
    begin step 2nd.......
    Thread_10 ready init work step 2nd......
    Main do ites work........
    BusiThread_11 do business-----
    BusiThread_11 do business-----
    BusiThread_11 do business-----

    可以看出,初始化线程执行完了之后,主线程和业务线程才开始工作。

    CountDownLatch典型用法2:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

    例:

    package concurrent;
    
    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            //裁判
            CountDownLatch countDown = new CountDownLatch(1);
            //5个选手
            CountDownLatch await = new CountDownLatch(5);
    
            // 依次创建并启动处于等待状态的5个MyRunnable线程
            for (int i = 0; i < 5; ++i) {
                new Thread(new MyRunnable(countDown, await)).start();
            }
    
            System.out.println("准备起跑......");
            countDown.countDown();
            System.out.println("选手开始赛跑");
            await.await();
            System.out.println("Bingo!");
        }
    }
    package concurrent;
    
    import java.util.concurrent.CountDownLatch;
    
    public class MyRunnable implements Runnable {
    
        private final CountDownLatch countDown;
        private final CountDownLatch await;
    
        public MyRunnable(CountDownLatch countDown, CountDownLatch await) {
            this.countDown = countDown;
            this.await = await;
        }
    
        public void run() {
            try {
                countDown.await();//等待主线程执行完毕,获得开始执行信号...
                System.out.println("跑完了");
                await.countDown();//完成预期工作,发出完成信号...
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    运行结果:

    准备起跑......
    选手开始赛跑
    跑完了
    跑完了
    跑完了
    跑完了
    跑完了
    Bingo!

    CountDownLatch的不足

    CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

    特有方法:

    public CountDownLatch(int count); //指定计数的次数,只能被设置1次
    public void countDown();          //调用此方法则计数减1
    public void await() throws InterruptedException   //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
    Public Long getCount();           //得到当前的计数
    Public boolean await(long timeout, TimeUnit unit) //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时

    From Object Inherited:

    Clone、equals、hashCode、notify、notifyALL、wait等。

    二.同步屏障CyclicBarrier

    CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。

    当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

    例:

    package concurrent;
    
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CyclicBarrier;
    
    /**
     *@author 梦里南柯
     *
     *类说明:CyclicBarrier的使用,演示多个线程互相等待
     */
    public class UseCyclicBarrier {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5, new CollectThread());
    
        private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();// 存放子线程工作结果的容器
    
        public static void main(String[] args) {
            System.out.println("屏障开启");
            // 启动5个线程
            for (int i = 0; i <= 4; i++) {
                Thread thread = new Thread(new SubThread());
                thread.start();
            }
    
        }
    
        // 负责屏障开放以后的工作
        private static class CollectThread implements Runnable {
    
            @Override
            public void run() {
                StringBuilder result = new StringBuilder();
                for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                    result.append("[" + workResult.getValue() + "]");
                }
                System.out.println("屏障开放, the result = " + result);
                System.out.println("do other business........");
            }
        }
    
        // 工作线程
        private static class SubThread implements Runnable {
    
            @Override
            public void run() {
                long id = Thread.currentThread().getId();// 线程本身的处理结果
                resultMap.put(Thread.currentThread().getId() + "", id);
                Random r = new Random();// 随机决定工作线程的是否睡眠
                try {
                    if (r.nextBoolean()) {
                        Thread.sleep(2000 + id);
                        System.out.println("Thread_" + id + " ....do something ");
                    }
                    System.out.println(id + "到达,开始等待其他线程");
                    // 等待其他的工作线程
                    barrier.await();
                    Thread.sleep(1000 + id);
                    System.out.println("Thread_" + id + " ....do its business ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
        }
    }

    运行结果:

    屏障开启
    12到达,开始等待其他线程
    13到达,开始等待其他线程
    Thread_10 ....do something 
    10到达,开始等待其他线程
    Thread_11 ....do something 
    11到达,开始等待其他线程
    Thread_14 ....do something 
    14到达,开始等待其他线程
    屏障开放, the result = [11][12][13][14][10]
    do other business........
    Thread_10 ....do its business 
    Thread_11 ....do its business 
    Thread_12 ....do its business 
    Thread_13 ....do its business 
    Thread_14 ....do its business 

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水

    package concurrent;
    
    import java.util.Map.Entry;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    /**
    * 银行流水处理服务类
    *使用线程池创建4个线程,分别计算每个sheet里的数据,每个sheet计算结果是1,再由
    BankWaterService线程汇总4个sheet计算出的结果
    * @authorftf
    *
    */
    public class BankWaterService implements Runnable {
        /**
         * 创建4个屏障,处理完之后执行当前类的run方法
         */
        private CyclicBarrier c = new CyclicBarrier(4, this);
        /**
         * 假设只有4个sheet,所以只启动4个线程
         */
        private Executor executor = Executors.newFixedThreadPool(4);
        /**
         * 保存每个sheet计算出的银流结果
         */
        private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
    
        private void count() {
            for (int i = 0; i < 4; i++) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 计算当前sheet的银流数据,计算代码省略
                        sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                        System.out.println(Thread.currentThread().getName() + "计算完成");
                        // 银流计算完成,插入一个屏障
                        try {
                            c.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
        @Override
        public void run() {
            int result = 0;
            // 汇总每个sheet计算出的结果
            for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
                result += sheet.getValue();
            }
            // 将结果输出
            sheetBankWaterCount.put("result", result);
            System.out.println("计算结果:" + result);
        }
    
        public static void main(String[] args) {
            BankWaterService bankWaterCount = new BankWaterService();
            bankWaterCount.count();
        }
    }

    运行结果:

    pool-1-thread-2计算完成
    pool-1-thread-4计算完成
    pool-1-thread-3计算完成
    pool-1-thread-1计算完成
    计算结果:4

    三.CyclicBarrier和CountDownLatch的区别

    CountDownLatch是一个或多个线程等待一组线程,需要外部线程(一个或多个线程)进行扣减,CyclicBarrier是线程间的相互等待,他们之间自己决定,是多个线程本身到达同一个地点。

    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

    CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

    四.控制并发线程数的Semaphore

    适合做流量控制,尤其是资源有限的情况。如数据库连接。假如读几万个文件的数据,然后写到数据库,这时需要开启几十个线程,而数据库的连接数只有10个,这时就要做流量控制了。

    package concurrent;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import org.omg.CORBA.PRIVATE_MEMBER;
    
    public class SemaphoreTest {
        private Semaphore smp = new Semaphore(5); 
        
        class TaskDemo implements Runnable{
            private String id;
            TaskDemo(String id){
                this.id = id;
            }
            @Override
            public void run(){
                try {
                    smp.acquire();
                    System.out.println("Thread " + id + " is working");
                    Thread.sleep(1000);
                    smp.release();
                    System.out.println("Thread " + id + " is over");
                } catch (InterruptedException e) {
                }
            }
        }
        
        public static void main(String[] args){
            SemaphoreTest semaphoreDemo = new SemaphoreTest();
            final int THREAD_COUNT = 10;
            ExecutorService se = Executors.newCachedThreadPool();
            for (int i = 0; i < THREAD_COUNT; i++) {
                se.submit(semaphoreDemo.new TaskDemo(String.valueOf(i)));
            }
            
            se.shutdown();
        }
    }

    运行结果:

    Thread 0 is working
    Thread 1 is working
    Thread 2 is working
    Thread 3 is working
    Thread 4 is working
    Thread 5 is working
    Thread 4 is over
    Thread 3 is over
    Thread 6 is working
    Thread 7 is working
    Thread 8 is working
    Thread 0 is over
    Thread 2 is over
    Thread 1 is over
    Thread 9 is working
    Thread 9 is over
    Thread 5 is over
    Thread 7 is over
    Thread 6 is over
    Thread 8 is over
    
  • 相关阅读:
    SpringBoot配置文件(2)
    SpringBoot配置文件(1)
    java8新特性之stream流
    java8新特性之Lambda表达式
    zoj 1709 Oil Deposits
    zoj 2110 Tempter of the Bone
    poj 2823 Sliding Window
    fzu 1894 志愿者选拔
    hdoj 1754 I Hate It
    poj2404中国邮递员
  • 原文地址:https://www.cnblogs.com/lingluo2017/p/10249299.html
Copyright © 2011-2022 走看看