zoukankan      html  css  js  c++  java
  • 多线程设计模式(三):Master-Worker模式

    Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。

    一、什么是Master-Worker模式:

    该模式的结构图:

      结构图:

    Worker:用于实际处理一个任务;

    Master:任务的分配和最终结果的合成;

    Main:启动程序,调度开启Master。

    二、代码实现:

        下面的是一个简易的Master-Worker框架实现。

    (1)Master部分:

    [java] view plain copy
     
    1. package MasterWorker;  
    2.   
    3. import java.util.HashMap;  
    4. import java.util.Map;  
    5. import java.util.Queue;  
    6. import java.util.concurrent.ConcurrentHashMap;  
    7. import java.util.concurrent.ConcurrentLinkedQueue;  
    8.   
    9. public class Master {  
    10.   
    11.     //任务队列  
    12.     protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();  
    13.     //Worker进程队列  
    14.     protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();  
    15.     //子任务处理结果集  
    16.     protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();  
    17.     //是否所有的子任务都结束了  
    18.     public boolean isComplete(){  
    19.         for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
    20.             if(entry.getValue().getState()!=Thread.State.TERMINATED){  
    21.                 return false;  
    22.             }  
    23.                   
    24.         }  
    25.         return true ;  
    26.     }  
    27.       
    28.     //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量  
    29.     public Master(Worker worker,int countWorker){  
    30.           
    31.         worker.setWorkQueue(workQueue);  
    32.         worker.setResultMap(resultMap);  
    33.         for(int i=0;i<countWorker;i++){  
    34.             threadMap.put(Integer.toString(i),  new Thread(worker, Integer.toString(i)));  
    35.         }  
    36.           
    37.     }  
    38.       
    39.     //提交一个任务  
    40.     public void submit(Object job){  
    41.         workQueue.add(job);  
    42.     }  
    43.       
    44.       
    45.     //返回子任务结果集  
    46.     public Map<String ,Object> getResultMap(){  
    47.         return resultMap;  
    48.     }  
    49.       
    50.       
    51.     //开始运行所有的Worker进程,进行处理  
    52.     public  void execute(){  
    53.          for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
    54.              entry.getValue().start();  
    55.                
    56.          }  
    57.     }  
    58.       
    59.       
    60. }  

    (2)Worker进程实现:

    [java] view plain copy
     
    1. package MasterWorker;  
    2.   
    3. import java.util.Map;  
    4. import java.util.Queue;  
    5.   
    6. public class Worker  implements Runnable{  
    7.   
    8.     //任务队列,用于取得子任务  
    9.     protected Queue<Object> workQueue;  
    10.     //子任务处理结果集  
    11.     protected Map<String ,Object> resultMap;  
    12.     public void setWorkQueue(Queue<Object> workQueue){  
    13.         this.workQueue= workQueue;  
    14.     }  
    15.       
    16.     public void setResultMap(Map<String ,Object> resultMap){  
    17.         this.resultMap=resultMap;  
    18.     }  
    19.     //子任务处理的逻辑,在子类中实现具体逻辑  
    20.     public Object handle(Object input){  
    21.         return input;  
    22.     }  
    23.       
    24.       
    25.     @Override  
    26.     public void run() {  
    27.           
    28.         while(true){  
    29.             //获取子任务  
    30.             Object input= workQueue.poll();  
    31.             if(input==null){  
    32.                 break;  
    33.             }  
    34.             //处理子任务  
    35.             Object re = handle(input);  
    36.             resultMap.put(Integer.toString(input.hashCode()), re);  
    37.         }  
    38.     }  
    39.   
    40. }  

    (3)运用这个小框架计算1——100的立方和,PlusWorker的实现:

    [java] view plain copy
     
    1. package MasterWorker;  
    2.   
    3. public class PlusWorker extends Worker {  
    4.   
    5.     @Override  
    6.     public Object handle(Object input) {  
    7.           
    8.         Integer i =(Integer)input;  
    9.         return i*i*i;  
    10.     }  
    11.   
    12.       
    13. }  

    (4)进行计算的Main函数:

    [java] view plain copy
     
    1. package MasterWorker;  
    2.   
    3. import java.util.Map;  
    4. import java.util.Set;  
    5.   
    6. public class Main {  
    7.   
    8.       
    9.     /** 
    10.      * @param args 
    11.      */  
    12.     public static void main(String[] args) {  
    13.         //固定使用5个Worker,并指定Worker  
    14.         Master m = new Master(new PlusWorker(), 5);  
    15.         //提交100个子任务  
    16.         for(int i=0;i<100;i++){  
    17.             m.submit(i);  
    18.         }  
    19.         //开始计算  
    20.         m.execute();  
    21.         int re= 0;  
    22.         //保存最终结算结果  
    23.         Map<String ,Object> resultMap =m.getResultMap();  
    24.           
    25.         //不需要等待所有Worker都执行完成,即可开始计算最终结果  
    26.         while(resultMap.size()>0 || !m.isComplete()){  
    27.             Set<String> keys = resultMap.keySet();  
    28.             String key =null;  
    29.             for(String k:keys){  
    30.                 key=k;  
    31.                 break;  
    32.             }  
    33.             Integer i =null;  
    34.             if(key!=null){  
    35.                 i=(Integer)resultMap.get(key);  
    36.             }  
    37.             if(i!=null){  
    38.                 //最终结果  
    39.                 re+=i;  
    40.             }  
    41.             if(key!=null){  
    42.                 //移除已经被计算过的项  
    43.                 resultMap.remove(key);  
    44.             }  
    45.               
    46.         }  
    47.           
    48.   
    49.     }  
    50.   
    51. }  

    三、总结:

        Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。

    转:http://blog.csdn.net/lmdcszh/article/details/39698189

  • 相关阅读:
    ubuntu 安装 less
    Django orm增删改字段、建表 ,单表增删改查,Django请求生命周期
    python RabbitMQ队列使用
    80个Python练手项目列表
    celery异步任务体系笔记
    为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比
    吞吐量(TPS)、QPS、并发数、响应时间(RT)
    Supervisor使用详解
    supervisor 使 celery后台运行
    celery Django 简单示例
  • 原文地址:https://www.cnblogs.com/duanxz/p/5143147.html
Copyright © 2011-2022 走看看