zoukankan      html  css  js  c++  java
  • Java--CyclicBarrier使用简介

    CyclicBarrier介绍 (一)
    一 个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

      主要使用方法:

    //设置parties、count及barrierCommand属性。   
    CyclicBarrier(int):   
      
    //当await的数量到达了设定的数量后,首先执行该Runnable对象。   
    CyclicBarrier(int,Runnable):   
      
    //通知barrier已完成线程   
    await():

    应用场景
      在某种需求中,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行  主任务,这时候,就可以选择CyclicBarrier了。

    实例分析
      我们需要统计全国的业务数据。其中各省的数据库是独立的,也就是说按省分库。并且统计的数据量很大,统计过程也比较慢。为了提高性能,快速计算。我们采取并发的方式,多个线程同时计算各省数据,最后再汇总统计。在这里CyclicBarrier就非常有用。看代码:
    主要类:

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    import utils.Tools.BillService;
    import utils.Tools.TotalService;
    
    
        /**  
         * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。  
         *   
         */  
        public class Total {   
          
            // private ConcurrentHashMap result = new ConcurrentHashMap();   
          
            public static void main(String[] args) {   
                TotalService totalService = new TotalServiceImpl();   
                CyclicBarrier barrier = new CyclicBarrier(5,   
                        new TotalTask(totalService));   
          
                // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。   
                new BillTask(new BillServiceImpl(), barrier, "北京").start();   
                new BillTask(new BillServiceImpl(), barrier, "上海").start();   
                new BillTask(new BillServiceImpl(), barrier, "广西").start();   
                new BillTask(new BillServiceImpl(), barrier, "四川").start();   
                new BillTask(new BillServiceImpl(), barrier, "黑龙江").start();   
          
            }   
        }   
          
        /**  
         * 主任务:汇总任务  
         */  
        class TotalTask implements Runnable {   
            private TotalService totalService;   
          
            TotalTask(TotalService totalService) {   
                this.totalService = totalService;   
            }   
          
            public void run() {   
                // 读取内存中各省的数据汇总,过程略。   
                totalService.count();   
                System.out.println("=======================================");   
                System.out.println("开始全国汇总");   
            }   
        }   
          
        /**  
         * 子任务:计费任务  
         */  
        class BillTask extends Thread {   
            // 计费服务   
            private BillService billService;   
            private CyclicBarrier barrier;   
            // 代码,按省代码分类,各省数据库独立。   
            private String code;   
          
            BillTask(BillService billService, CyclicBarrier barrier, String code) {   
                this.billService = billService;   
                this.barrier = barrier;   
                this.code = code;   
            }   
          
            public void run() {   
                System.out.println("开始计算--" + code + "省--数据!");   
                billService.bill(code);   
                // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略   
                System.out.println(code + "省已经计算完成,并通知汇总Service!");   
                try {   
                    // 通知barrier已经完成   
                    barrier.await();   
                } catch (InterruptedException e) {   
                    e.printStackTrace();   
                } catch (BrokenBarrierException e) {   
                    e.printStackTrace();   
                }   
            }   
          
        }  
    import utils.Tools.BillService;
    
    
    public class BillServiceImpl implements BillService {
    
        @Override
        public void bill(String code) {
            // TODO Auto-generated method stub
    
        }
    
    }
    import utils.Tools.TotalService;
    
    public class TotalServiceImpl implements TotalService{
    
            @Override
            public void count() {
                // TODO Auto-generated method stub
                
            }
            
        }
    package utils;
    
    public class Tools {
        /**
         * 
         */
        public interface BillService {
    
            /**
             * 各省计费
             * 
             * @param code
             *            省编码
             */
            public void bill(String code);
    
        }
    
        /**
         * 
         */
        public interface TotalService {
    
            /**
             * 汇总各省数据
             */
            public void count();
    
        }
        
    }

    结果:

    开始计算--北京省--数据!
    开始计算--四川省--数据!
    开始计算--黑龙江省--数据!
    开始计算--上海省--数据!
    开始计算--广西省--数据!
    上海省已经计算完成,并通知汇总Service!
    黑龙江省已经计算完成,并通知汇总Service!
    四川省已经计算完成,并通知汇总Service!
    北京省已经计算完成,并通知汇总Service!
    广西省已经计算完成,并通知汇总Service!
    =======================================
    开始全国汇总

    CyclicBarrier介绍 (二)

    张孝祥视频学习笔记:

    CyclicBarrier 表示大家彼此等待,大家集合好后才开始出发,分散活动后又在i指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐……

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CyclicBarrierTest {
        public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final CyclicBarrier cb = new CyclicBarrier(3); // 三个线程同时到达
            for (int i = 0; i < 3; i++) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点1,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点2,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "即将到达集合地点3,当前已有"
                                    + (cb.getNumberWaiting() + 1)
                                    + "个已到达"
                                    + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            try {
                                cb.await();
                            } catch (BrokenBarrierException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
            service.shutdown();
        }
    }

    输出结果

    线程pool-1-thread-3即将到达集合地点1,当前已有1个已到达正在等候
    线程pool-1-thread-2即将到达集合地点1,当前已有2个已到达正在等候
    线程pool-1-thread-1即将到达集合地点1,当前已有3个已到达都到齐了,继续走啊
    线程pool-1-thread-1即将到达集合地点2,当前已有1个已到达正在等候
    线程pool-1-thread-2即将到达集合地点2,当前已有2个已到达正在等候
    线程pool-1-thread-3即将到达集合地点2,当前已有3个已到达都到齐了,继续走啊
    线程pool-1-thread-3即将到达集合地点3,当前已有1个已到达正在等候
    线程pool-1-thread-1即将到达集合地点3,当前已有2个已到达正在等候
    线程pool-1-thread-2即将到达集合地点3,当前已有3个已到达都到齐了,继续走啊

    CyclicBarrier介绍 (三)

      旅游,导游带队,只有在全部成员到齐的时候,导游才会下达去下一个景点的通知。

    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Travel {
        private static final int THREAD_COUNT = 3;
    //  当THREAD_COUNT 逐个减到0的时候,就开是执行runnable
        private final static CyclicBarrier CYSLIC_BARRIER = new CyclicBarrier(
                THREAD_COUNT, new Runnable() {
    
                    @Override
                    public void run() {
                        // TODO Auto-generated method stub
                        System.out.println("我是导游,本次点名结束,准备下一个景点。");
                    }
                });
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            ExecutorService executor = Executors.newCachedThreadPool();
            for (int i = 0; i < THREAD_COUNT; i++) {
                new Thread(String.valueOf(i)){
                    public void run() {
                        try {
                            System.out.println("我是线程"+ this.getName()+",我们到达旅游景点。");
                            CYSLIC_BARRIER.await();
                            System.out.println("我是线程"+ this.getName()+",我们开始骑车。");
                            CYSLIC_BARRIER.await();
                            System.out.println("我是线程"+ this.getName()+",我们开始爬山。");
                            CYSLIC_BARRIER.await();
                            System.out.println("我是线程"+ this.getName()+",我们回宾馆休息。");
                            CYSLIC_BARRIER.await();
                            System.out.println("我是线程"+ this.getName()+",我们骑车回家。");
                            CYSLIC_BARRIER.await();
                            System.out.println("我是线程"+ this.getName()+",我们到家了。");
                            CYSLIC_BARRIER.await();
                        } catch (Exception e) {
                            // TODO: handle exception
                        }
                    };
                }.start();
            }
        }
    
    }

    结果:

    我是线程2,我们到达旅游景点。
    我是线程0,我们到达旅游景点。
    我是线程1,我们到达旅游景点。
    我是导游,本次点名结束,准备下一个景点。
    我是线程1,我们开始骑车。
    我是线程0,我们开始骑车。
    我是线程2,我们开始骑车。
    我是导游,本次点名结束,准备下一个景点。
    我是线程2,我们开始爬山。
    我是线程0,我们开始爬山。
    我是线程1,我们开始爬山。
    我是导游,本次点名结束,准备下一个景点。
    我是线程1,我们回宾馆休息。
    我是线程0,我们回宾馆休息。
    我是线程2,我们回宾馆休息。
    我是导游,本次点名结束,准备下一个景点。
    我是线程2,我们骑车回家。
    我是线程0,我们骑车回家。
    我是线程1,我们骑车回家。
    我是导游,本次点名结束,准备下一个景点。
    我是线程1,我们到家了。
    我是线程0,我们到家了。
    我是线程2,我们到家了。
    我是导游,本次点名结束,准备下一个景点。
  • 相关阅读:
    Oracle中常见的33个等待事件小结
    DATAGUARD中手工处理日志v$archive_GAP的方法
    ORACLE 如何定位消耗资源的SQL
    ORACLE 全局索引和本地索引
    Oracle中获取执行计划的几种方法分析
    BUFFER CACHE之主要的等待事件
    查看tablespace实际使用量和剩余空间
    Linux环境配置文件的理解
    Shell 传递参数
    Linux开局配置注意事项
  • 原文地址:https://www.cnblogs.com/plxx/p/4339091.html
Copyright © 2011-2022 走看看