zoukankan      html  css  js  c++  java
  • 并发模型(二)——Master-Worker模式

       Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。

    一、什么是Master-Worker模式:

    该模式的结构图:

    结构图:

    Worker:用于实际处理一个任务;

    Master:任务的分配和最终结果的合成;

    Main:启动程序,调度开启Master。

    二、代码实现:

        下面的是一个简易的Master-Worker框架实现。

    (1)Master部分:

    package MasterWorker;  
      
    import java.util.HashMap;  
    import java.util.Map;  
    import java.util.Queue;  
    import java.util.concurrent.ConcurrentHashMap;  
    import java.util.concurrent.ConcurrentLinkedQueue;  
      
    public class Master {  
      
        //任务队列  
        protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();  
        //Worker进程队列  
        protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();  
        //子任务处理结果集  
        protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();  
        //是否所有的子任务都结束了  
        public boolean isComplete(){  
            for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
                if(entry.getValue().getState()!=Thread.State.TERMINATED){  
                    return false;  
                }  
                      
            }  
            return true ;  
        }  
          
        //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量  
        public Master(Worker worker,int countWorker){  
              
            worker.setWorkQueue(workQueue);  
            worker.setResultMap(resultMap);  
            for(int i=0;i<countWorker;i++){  
                threadMap.put(Integer.toString(i),  new Thread(worker, Integer.toString(i)));  
            }  
              
        }  
          
        //提交一个任务  
        public void submit(Object job){  
            workQueue.add(job);  
        }  
          
          
        //返回子任务结果集  
        public Map<String ,Object> getResultMap(){  
            return resultMap;  
        }  
          
          
        //开始运行所有的Worker进程,进行处理  
        public  void execute(){  
             for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
                 entry.getValue().start();  
                   
             }  
        }  
          
          
    }  

    Worker进程实现:

    package MasterWorker;  
      
    import java.util.Map;  
    import java.util.Queue;  
      
    public class Worker  implements Runnable{  
      
        //任务队列,用于取得子任务  
        protected Queue<Object> workQueue;  
        //子任务处理结果集  
        protected Map<String ,Object> resultMap;  
        public void setWorkQueue(Queue<Object> workQueue){  
            this.workQueue= workQueue;  
        }  
          
        public void setResultMap(Map<String ,Object> resultMap){  
            this.resultMap=resultMap;  
        }  
        //子任务处理的逻辑,在子类中实现具体逻辑  
        public Object handle(Object input){  
            return input;  
        }  
          
          
        @Override  
        public void run() {  
              
            while(true){  
                //获取子任务  
                Object input= workQueue.poll();  
                if(input==null){  
                    break;  
                }  
                //处理子任务  
                Object re = handle(input);  
                resultMap.put(Integer.toString(input.hashCode()), re);  
            }  
        }  
      
    }  

    运用这个小框架计算1——100的立方和,PlusWorker的实现:

    package MasterWorker;  
      
    public class PlusWorker extends Worker {  
      
        @Override  
        public Object handle(Object input) {  
              
            Integer i =(Integer)input;  
            return i*i*i;  
        }  
      
          
    }  

    进行计算的Main函数:

    package MasterWorker;  
      
    import java.util.Map;  
    import java.util.Set;  
      
    public class Main {  
      
          
        /** 
         * @param args 
         */  
        public static void main(String[] args) {  
            //固定使用5个Worker,并指定Worker  
            Master m = new Master(new PlusWorker(), 5);  
            //提交100个子任务  
            for(int i=0;i<100;i++){  
                m.submit(i);  
            }  
            //开始计算  
            m.execute();  
            int re= 0;  
            //保存最终结算结果  
            Map<String ,Object> resultMap =m.getResultMap();  
              
            //不需要等待所有Worker都执行完成,即可开始计算最终结果  
            while(resultMap.size()>0 || !m.isComplete()){  
                Set<String> keys = resultMap.keySet();  
                String key =null;  
                for(String k:keys){  
                    key=k;  
                    break;  
                }  
                Integer i =null;  
                if(key!=null){  
                    i=(Integer)resultMap.get(key);  
                }  
                if(i!=null){  
                    //最终结果  
                    re+=i;  
                }  
                if(key!=null){  
                    //移除已经被计算过的项  
                    resultMap.remove(key);  
                }  
                  
            }  
              
      
        }  
      
    }  

    三、总结:

        Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。

     

  • 相关阅读:
    防抖节流函数
    富文本编辑器tinymce在vue中的使用
    vue脚手架中检测lodop安装情况
    打印插件LODOP Vue中的使用
    vue打包后刷新页面报错:Unexpected token <
    Bootstrap-table表格插件的使用方法
    jsTree树插件
    Vue监控器watch的全面解析
    Vue计算属性computed的全面解析
    基于JQuery可拖动列表格插件DataTables的踩坑记
  • 原文地址:https://www.cnblogs.com/duan2/p/7769133.html
Copyright © 2011-2022 走看看