zoukankan      html  css  js  c++  java
  • 并行模式之Master-Worker模式

    并行模式之Master-Worker模式

    一)、Master-Worker模式

    作用: 将一个大任务分解成若干个小任务,分发给多个子线程执行。

    注: 将大任务分解成小任务,小任务的实现逻辑要相同。

    二)、Master-Worker模式的结构

    Master-Worker的核心思想:由Master进程和Worker进程实现。

    1)、Master进程:

    接收和分配任务,整合最终的处理结果。

    内部结构组成:

    1.Worker进程队列

    作用:执行Master分配的任务,开启多线程处理数据。
    

    2.子任务队列

     作用:接收任务。
    

    3.子结果集

     作用:整合多个线程的最终处理结果。
    

    注:Master进程分配任务后会立即返回,不会等待系统全部处理完返回,处理过

         程是异步的,client不会出现等待状态。         
    

    2)、Worker进程

    实现Runable接口,定义一个handle()方法,在方法里实现小任务的业务逻辑,在run()方法中调用handle()方法

    内部结构组成:

    1.子任务队列

    作用:接收Master分配的任务。
    

    2.结果集队列

     作用:将处理的结果返给Master
    

    3.handle()方法

    作用:定义小任务的业务逻辑。

    注:所有的Wroker对象共享Master的Worker队列,和结果集.

    三)、Master-Worker的代码实现

    使用Master-Worker模式实现 :

    计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )

    思路:将计算任务分解为100个小任务,每个子任务计算单独的立方和,最后将计

             算结果返回给master。
    

    Master类:

    /**
     * Master类:
     *   内部维护了一个Worker进程队列、任务队列、结果集
     */
    public class Master {
        /**
         * 任务队列, ConcurrentLinkedDeque: 基于链表的并发队列
         */
        protected Queue workerQueue = new ConcurrentLinkedDeque();
    
        /**
         * Worker进程队列
         */
        Map<String, Thread> threadMap = new HashMap<>();
    
        /**
         * 结果集
         */
        Map<String, Object> resultMap = new HashMap<>();
    
        /**
         * Master的构造,需要一个进程对象和进程数量
         */
        public Master(Worker worker, int countWorker){
            //将master的workerQueue和resultMap和Worker关联
            worker.setWorkerQueue(workerQueue);
            worker.setResultMap(resultMap);
            //根据需要的进程数量,创建进程
            for(int i = 0; i < countWorker; i++){
                //Integer.toString(i)为线程的名字
                threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
            }
        }
    
        /**
         * master进程提交任务
         */
        public void submit(Object input){
            workerQueue.add(input);
        }
    
        /**
         * master分配任务,执行任务,开启多线程
         */
        public void execute(){
            for(Map.Entry<String,Thread> entry : threadMap.entrySet()){
                entry.getValue().start();
    
            }
        }
    
        /**
         * 判断是否所有的子任务都结束了
         */
        public boolean isComplete(){
            //判断worker进程的状态是否都等于终止状态
            for(Map.Entry<String,Thread> entry : threadMap.entrySet()){
                if(entry.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
            }
            return true;
        }
    
        public Queue getWorkerQueue() {
            return workerQueue;
        }
    
        public void setWorkerQueue(Queue workerQueue) {
            this.workerQueue = workerQueue;
        }
    
        public Map<String, Thread> getThreadMap() {
            return threadMap;
        }
    
        public void setThreadMap(Map<String, Thread> threadMap) {
            this.threadMap = threadMap;
        }
    
        public Map<String, Object> getResultMap() {
            return resultMap;
        }
    
        public void setResultMap(Map<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    }
    

    Worker类:实现Runable接口,定义handle()方法,在run()中调用handle()

    /**
     * Worker类:
     *   小任务的逻辑实现类
     */
    public class Worker implements Runnable{
        /**
         * 接收master的workerQueue,用于开启线程处理多个小任务
         */
        protected Queue workerQueue;
    
        /**
         * 接收master的resultMap,将各个小任务的处理结果返回给Master
         */
        protected Map<String, Object> resultMap;
    
        /**
         * 小线程的具体逻辑,接收任务并处理任务
         * 参数: input,分解的小任务
         */
        public Object handle(Object input){
            return input;
        }
    
        /**
         * 从任务列表中获取任务,调用handle()方法,执行任务
         */
        @Override
        public void run() {
            while(true){
                //获取任务列表的任务
                Object input = workerQueue.poll();
                //判断任务列表是否还有任务,若没有,则终止线程。
                if(input == null){
                    break;
                }
                //有任务,调用handle()方法,执行任务
                Object result = handle(input);
                //将结果放在master的resultMap中
                resultMap.put(Integer.toString(input.hashCode()),result);
            }
        }
    
        public Queue getWorkerQueue() {
            return workerQueue;
        }
    
        public void setWorkerQueue(Queue workerQueue) {
            this.workerQueue = workerQueue;
        }
    
        public Map<String, Object> getResultMap() {
            return resultMap;
        }
    
        public void setResultMap(Map<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    
    }
    

    PlushWorker类:继承了Worker类,重写handle()方法,定义任务的处理逻辑

    **
     * 继承Worker类,定义handle()的业务逻辑
     */
    public class PlushWorker extends Worker{
        /**
         * 对任务进行处理,在run()方法中调用该方法
         * @param input
         * @return
         */
        @Override
        public Object handle(Object input){
            int i = (Integer)input;
            return i * i * i;
        }
    }
    

    main类:使用master计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )

    /**
     * 使用Master-Worker模式实现 :
     *    计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )
     *    1.将计算任务分解为100个小任务,每个子任务计算单独的立方和,最后将计算结果返回给master
     */
    public class CubicAndCalculate {
        public static void main(String[] args) {
            //创建Master对象,开启五个线程对任务进行处理
            Master master = new Master(new PlushWorker(), 5);
            //将大任务分解成100个小任务
            for(int i = 1; i <= 100; i++){
                master.submit(i);
            }
            //开启多线程,并发的处理多个小任务
            master.execute();
            //master可以不用等全部的任务执行完便可以获取任务的执行结果
            Map<String, Object> resultMap = master.getResultMap();
            Integer re = 0;
            //加入两个判断是因为,每取出一个resultMap中的数据,都要对该数据删除,若消费resultMap的速度比生成的快,可通过判断任务线程是否执行完毕来控制数据的获取
            while(resultMap.size() > 0 || !master.isComplete()){
                //获取map的所有键
                Set<String> set = resultMap.keySet();
                String key = null;
                for(String k : set){
                    key = k;
                    break;
                }
                Integer i = null;
                //判断map中是否有数据,若key值为null,就不进行获取值操作
                if(key != null){
                    i = (Integer) resultMap.get(key);
                }
                //如果key存在,i !=null,则对返回的部分的任务的立方和进行累加,并移除resultMap中对应的项
                if(i != null){
                    re += i;
                    resultMap.remove(key);
                }
            }
            System.out.println(re);
        }
    }
    

    结果:

    25502500
    金麟岂能忍一世平凡 飞上了青天 天下还依然
  • 相关阅读:
    手机电池mAh和Wh概念
    运行中的iOS应用创建的文件们
    iOS分辨率的那些事儿
    为什么判断UITextField判断为空不能用isEqualToString:@""
    iOS开发里的Bundle是个啥玩意?!
    XCode里的模拟器到底在哪里?我的App被放到哪里了?
    iOS推送消息报错误“Domain=NSCocoaErrorDomain Code=3000”的可能问题
    让你的WordPress支持嵌入ObjectiveC代码
    升级 Mountain Lion 后,svn服务无法使用的解决方法
    对于javascript的词法作用域的思考
  • 原文地址:https://www.cnblogs.com/Auge/p/11692725.html
Copyright © 2011-2022 走看看