zoukankan      html  css  js  c++  java
  • Master-worker设计模式

    Master-worker模式是常用的并行计算模式。它的核心思想是系统是由两类进程协助工作:Master进行和worker进程。Master负责接收和分配任务,worker负责处理子任务。当各个worker子进程处理完成后,会返回结果给master,由master做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

    代码实现:

    Master:

     1 package com.java.day04_mode_masterworker;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 import java.util.concurrent.ConcurrentHashMap;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 
     8 public class Master {
     9 
    10     //1.首先有承装任务的容器
    11     private ConcurrentLinkedQueue<Task> workQueue= new ConcurrentLinkedQueue<>();
    12     
    13     //2.有承装worker的容器
    14     private HashMap<String, Thread> workers = new HashMap<>();
    15     
    16     //3.承装worker结果集的容器
    17     private ConcurrentHashMap<String , Object> results = new ConcurrentHashMap<>();
    18     
    19     //4.构造方法
    20     public Master(Worker worker,int workerCount){
    21         worker.setWorkQueue(workQueue);
    22         worker.setResults(results);
    23         
    24         for (int i = 0; i < workerCount; i++) {
    25             workers.put("节点"+Integer.toString(i), new Thread(worker));
    26         }
    27     }
    28     
    29     //5.往任务队列里提交任务的方法
    30     public void submit(Task task){
    31         this.workQueue.add(task);
    32     }
    33     
    34     //6.启动应用程序,让所有的worker工作
    35     public void excute(){
    36         //map的循环方式
    37         for(Map.Entry<String, Thread> me : workers.entrySet()){
    38             me.getValue().start();
    39         }
    40     }
    41 
    42     //判断线程是否执行完毕:当所有的任务执行完毕,线程也停止
    43     public boolean isComplete() {
    44         
    45         for(Map.Entry<String, Thread> me : workers.entrySet()){
    46             if(me.getValue().getState() != Thread.State.TERMINATED){
    47                 return false;
    48             }
    49         }
    50         
    51         return true;
    52     }
    53 
    54     //返回结果集合
    55     public long getResult() {
    56         long result = 0;
    57         
    58         for(Map.Entry<String, Object> me : results.entrySet()){
    59             result+=(Integer)me.getValue();
    60         }
    61         return result;
    62     }
    63     
    64     
    65     
    66 }

    Worker:

     1 package com.java.day04_mode_masterworker;
     2 
     3 import java.util.HashMap;
     4 import java.util.concurrent.ConcurrentHashMap;
     5 import java.util.concurrent.ConcurrentLinkedQueue;
     6 
     7 public class Worker implements Runnable{
     8     
     9     private ConcurrentLinkedQueue<Task> workQueue;
    10     private ConcurrentHashMap<String, Object> results;
    11     
    12 
    13     @Override
    14     public void run() {
    15         while(true){
    16             Task input = workQueue.poll();
    17             //任务执行完之后,break,线程停止
    18             if(input == null) break;
    19             Object output = handle(input);
    20             
    21             results.put(Integer.toString( input.getId()), output);
    22         }
    23         
    24     }
    25 
    26     //可以把下面的方法提取出来,在worker里面返回空,然后写一个子类继承worker,在worker里面重写此方法,解耦,灵活度也更高,Object output = 子类.handle(input);
    27     //对数据的具体操作
    28     private Object handle(Task input) {
    29         Object result = null;
    30         
    31         try {
    32             Thread.sleep(500);
    33         } catch (InterruptedException e) {
    34             e.printStackTrace();
    35         }
    36         
    37         result=input.getPrice();
    38         
    39         
    40         return result;
    41     }
    42 
    43     public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
    44         this.workQueue=workQueue;
    45     }
    46 
    47     public void setResults(ConcurrentHashMap<String, Object> results) {
    48         this.results=results;
    49     }
    50 
    51     
    52     
    53 }

    Task:

     1 package com.java.day04_mode_masterworker;
     2 
     3 public class Task {
     4     private int id;
     5     private String name;
     6     private int price;
     7     public int getId() {
     8         return id;
     9     }
    10     public void setId(int id) {
    11         this.id = id;
    12     }
    13     public String getName() {
    14         return name;
    15     }
    16     public void setName(String name) {
    17         this.name = name;
    18     }
    19     public int getPrice() {
    20         return price;
    21     }
    22     public void setPrice(int price) {
    23         this.price = price;
    24     }
    25     
    26     
    27 }

    Main:

     1 package com.java.day04_mode_masterworker;
     2 
     3 public class Main {
     4     
     5     public static void main(String[] args) {
     6         
     7         //这一块的10(线程数)要根据机器具体的性能来
     8         //Runtime.getRuntime().availableProcessors():我机器可以用的process数量
     9         
    10         //创建master,和所需要的worker
    11         Master master = new Master(new Worker(),10);
    12         
    13         //提交任务
    14         for (int i = 1; i < 101; i++) {
    15             Task t = new Task();
    16             t.setId(i);
    17             t.setName("任务"+i);
    18             t.setPrice(i+1000);
    19             
    20             master.submit(t);
    21         }
    22         
    23         //执行任务
    24         master.excute();
    25         
    26         long start  = System.currentTimeMillis();
    27         
    28         //判断任务是否执行完毕
    29         while(true){
    30             if(master.isComplete()){
    31                 long end = System.currentTimeMillis()-start;
    32                 //任务执行完成,返回结果集
    33                 long result = master.getResult();
    34                 System.out.println("执行了"+end+"秒,最终结果为:"+result);
    35                 break;
    36             }
    37         }
    38         
    39         
    40         
    41         
    42         
    43         
    44     }
    45     
    46     
    47 }

    其中上面的代码还可以进行优化

  • 相关阅读:
    (转)MyEclipse +Servlet
    Android: MediaRecorder start failed
    Android: 帮助找出内存泄漏的工具
    Node & Express: some tips
    MySQL: Create Master
    scp: useful commands
    MySQL: 打开binlog选项后无法重启MySQL
    IIS: 配置web.config解决Maximum request length exceeded错误
    MySQL: 让MySQL支持颜文字emoji
    Linux: 通过命令行上传文件到ftp服务器
  • 原文地址:https://www.cnblogs.com/syousetu/p/6756200.html
Copyright © 2011-2022 走看看