zoukankan      html  css  js  c++  java
  • 并发模型(二)——Master-Worker模式

       Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。

    一、什么是Master-Worker模式:

    该模式的结构图:

    结构图:

    Worker:用于实际处理一个任务;

    Master:任务的分配和最终结果的合成;

    Main:启动程序,调度开启Master。

    二、代码实现:

        下面的是一个简易的Master-Worker框架实现。

    (1)Master部分:

    package MasterWorker;  
      
    import java.util.HashMap;  
    import java.util.Map;  
    import java.util.Queue;  
    import java.util.concurrent.ConcurrentHashMap;  
    import java.util.concurrent.ConcurrentLinkedQueue;  
      
    public class Master {  
      
        //任务队列  
        protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();  
        //Worker进程队列  
        protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();  
        //子任务处理结果集  
        protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();  
        //是否所有的子任务都结束了  
        public boolean isComplete(){  
            for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
                if(entry.getValue().getState()!=Thread.State.TERMINATED){  
                    return false;  
                }  
                      
            }  
            return true ;  
        }  
          
        //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量  
        public Master(Worker worker,int countWorker){  
              
            worker.setWorkQueue(workQueue);  
            worker.setResultMap(resultMap);  
            for(int i=0;i<countWorker;i++){  
                threadMap.put(Integer.toString(i),  new Thread(worker, Integer.toString(i)));  
            }  
              
        }  
          
        //提交一个任务  
        public void submit(Object job){  
            workQueue.add(job);  
        }  
          
          
        //返回子任务结果集  
        public Map<String ,Object> getResultMap(){  
            return resultMap;  
        }  
          
          
        //开始运行所有的Worker进程,进行处理  
        public  void execute(){  
             for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
                 entry.getValue().start();  
                   
             }  
        }  
          
          
    }  

    Worker进程实现:

    package MasterWorker;  
      
    import java.util.Map;  
    import java.util.Queue;  
      
    public class Worker  implements Runnable{  
      
        //任务队列,用于取得子任务  
        protected Queue<Object> workQueue;  
        //子任务处理结果集  
        protected Map<String ,Object> resultMap;  
        public void setWorkQueue(Queue<Object> workQueue){  
            this.workQueue= workQueue;  
        }  
          
        public void setResultMap(Map<String ,Object> resultMap){  
            this.resultMap=resultMap;  
        }  
        //子任务处理的逻辑,在子类中实现具体逻辑  
        public Object handle(Object input){  
            return input;  
        }  
          
          
        @Override  
        public void run() {  
              
            while(true){  
                //获取子任务  
                Object input= workQueue.poll();  
                if(input==null){  
                    break;  
                }  
                //处理子任务  
                Object re = handle(input);  
                resultMap.put(Integer.toString(input.hashCode()), re);  
            }  
        }  
      
    }  

    运用这个小框架计算1——100的立方和,PlusWorker的实现:

    package MasterWorker;  
      
    public class PlusWorker extends Worker {  
      
        @Override  
        public Object handle(Object input) {  
              
            Integer i =(Integer)input;  
            return i*i*i;  
        }  
      
          
    }  

    进行计算的Main函数:

    package MasterWorker;  
      
    import java.util.Map;  
    import java.util.Set;  
      
    public class Main {  
      
          
        /** 
         * @param args 
         */  
        public static void main(String[] args) {  
            //固定使用5个Worker,并指定Worker  
            Master m = new Master(new PlusWorker(), 5);  
            //提交100个子任务  
            for(int i=0;i<100;i++){  
                m.submit(i);  
            }  
            //开始计算  
            m.execute();  
            int re= 0;  
            //保存最终结算结果  
            Map<String ,Object> resultMap =m.getResultMap();  
              
            //不需要等待所有Worker都执行完成,即可开始计算最终结果  
            while(resultMap.size()>0 || !m.isComplete()){  
                Set<String> keys = resultMap.keySet();  
                String key =null;  
                for(String k:keys){  
                    key=k;  
                    break;  
                }  
                Integer i =null;  
                if(key!=null){  
                    i=(Integer)resultMap.get(key);  
                }  
                if(i!=null){  
                    //最终结果  
                    re+=i;  
                }  
                if(key!=null){  
                    //移除已经被计算过的项  
                    resultMap.remove(key);  
                }  
                  
            }  
              
      
        }  
      
    }  

    三、总结:

        Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。

     

  • 相关阅读:
    关于js计算非等宽字体宽度的方法
    [NodeJs系列]聊一聊BOM
    Vue.js路由管理器 Vue Router
    vue 实践技巧合集
    微任务、宏任务与Event-Loop
    事件循环(EventLoop)的学习总结
    Cookie、Session和LocalStorage
    MySQL 树形结构 根据指定节点 获取其所在全路径节点序列
    MySQL 树形结构 根据指定节点 获取其所有父节点序列
    MySQL 创建函数报错 This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its declaration and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators
  • 原文地址:https://www.cnblogs.com/duan2/p/7769133.html
Copyright © 2011-2022 走看看