zoukankan      html  css  js  c++  java
  • 并发模型之Master-Worker设计模式

    一、Master-Worker设计模式

    Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果返回给Master进程,由Master进行归纳和汇总,从而得到系统结果。

    Master-Worker模式的好处是,它能将大任务分解成若干个小任务,并发执行,从而提高系统性能。而对于系统请求者Client来说,任务一旦提交,Master进程就会立刻分配任务并立即返回,并不会等系统处理完全部任务再返回,其处理过程是异步的。

    二、Master-Worker设计模式代码实现

    1、创建Task任务对象

     1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
     2 
     3 /**
     4  * Created by Root on 5/12/2017.
     5  */
     6 public class Task {
     7 
     8     private int id;
     9 
    10     private String name;
    11 
    12     private int price;
    13 
    14     public int getId() {
    15         return id;
    16     }
    17 
    18     public void setId(int id) {
    19         this.id = id;
    20     }
    21 
    22     public String getName() {
    23         return name;
    24     }
    25 
    26     public void setName(String name) {
    27         this.name = name;
    28     }
    29 
    30     public int getPrice() {
    31         return price;
    32     }
    33 
    34     public void setPrice(int price) {
    35         this.price = price;
    36     }
    37 }

    2、实现Worker对象

     1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
     2 
     3 import java.util.concurrent.ConcurrentHashMap;
     4 import java.util.concurrent.ConcurrentLinkedQueue;
     5 
     6 /**
     7  * Created by Root on 5/12/2017.
     8  */
     9 public class Worker implements Runnable {
    10 
    11     private ConcurrentLinkedQueue<Task> workQueue;
    12     private ConcurrentHashMap<String, Object> resultMap;
    13 
    14     public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
    15         this.workQueue = workQueue;
    16     }
    17 
    18     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
    19         this.resultMap = resultMap;
    20     }
    21 
    22     @Override
    23     public void run() {
    24         while (true) {
    25             Task input = this.workQueue.poll();
    26             if (input == null) {
    27                 break;
    28             }
    29             // 真正的去做业务处理
    30             //Object output = handle(input);
    31             // 改造
    32             Object output = MyWorker.handle(input);
    33             // 返回处理结果集
    34             this.resultMap.put(Integer.toString(input.getId()), output);
    35         }
    36     }
    37 
    38 //    private Object handle(Task input) {
    39 //        Object output = null;
    40 //        try {
    41 //            // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
    42 //            Thread.sleep(500);
    43 //            output = input.getPrice();
    44 //        } catch (InterruptedException e) {
    45 //            e.printStackTrace();
    46 //        }
    47 //        return output;
    48 //    }
    49 
    50     // 优化,考虑让继承类去自己实现具体的业务处理
    51     public static Object handle(Task input) {
    52         return null;
    53     }
    54 
    55 }

    3、为了使程序更灵活,将具体的业务执行逻辑抽离,在具体的Worker对象去实现,如这里的MyWorker对象

     1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
     2 
     3 /**
     4  * Created by Root on 5/13/2017.
     5  */
     6 public class MyWorker extends Worker {
     7 
     8     public static Object handle(Task input) {
     9         Object output = null;
    10         try {
    11             // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
    12             Thread.sleep(500);
    13             output = input.getPrice();
    14         } catch (InterruptedException e) {
    15             e.printStackTrace();
    16         }
    17         return output;
    18     }
    19 
    20 }

    4、Master类

     1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 import java.util.concurrent.ConcurrentHashMap;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 
     8 /**
     9  * Created by Root on 5/12/2017.
    10  */
    11 public class Master {
    12 
    13     // 1、使用一个ConcurrentLinkedQueue集合来装载所有需要执行的任务
    14     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
    15 
    16     // 2、使用HashMap来装载所有的worker对象
    17     private HashMap<String, Thread> workers = new HashMap<String, Thread>();
    18 
    19     // 3、使用一个容器承装每一个worker并发执行任务的结果集
    20     private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
    21 
    22     // 4、构造方法
    23     public Master(Worker worker, int workerCount) {
    24         // 每一个worker对象都需要有Master的引用,workQueue用于任务的领取,resultMap用于任务的提交
    25         worker.setWorkerQueue(this.workQueue);
    26         worker.setResultMap(this.resultMap);
    27 
    28         for (int i = 0; i < workerCount; i++) {
    29             workers.put("子节点" + Integer.toString(i), new Thread(worker));
    30         }
    31     }
    32 
    33     // 5、提交方法
    34     public void submit(Task task) {
    35         this.workQueue.add(task);
    36     }
    37 
    38     // 6、需要有一个执行的方法(启动应用程序,让所有的worker工作)
    39     public void execute() {
    40         for (Map.Entry<String, Thread> me : workers.entrySet()) {
    41             me.getValue().start();
    42         }
    43     }
    44 
    45     // 7、判断线程是否执行完毕
    46     public boolean isComplete() {
    47         for (Map.Entry<String, Thread> me : workers.entrySet()) {
    48             // 判断所有的线程状态是否属于已停止状态
    49             if (me.getValue().getState() != Thread.State.TERMINATED) {
    50                 return false;
    51             }
    52         }
    53         return true;
    54     }
    55 
    56     // 8、返回结果集数据
    57     public int getResult() {
    58         int ret = 0;
    59         for (Map.Entry<String, Object> me : resultMap.entrySet()) {
    60             // 汇总逻辑
    61             ret += (Integer) me.getValue();
    62         }
    63         return ret;
    64     }
    65 
    66 }

    5、测试,具体调用实现

     1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
     2 
     3 import java.util.Random;
     4 
     5 /**
     6  * Created by Root on 5/13/2017.
     7  */
     8 public class MasterWorkerTest {
     9 
    10     public static void main(String[] args) {
    11 
    12 //        Master master = new Master(new Worker(), 10);
    13         // 改造
    14 //        Master master = new Master(new MyWorker(), 10);
    15         // 改造(获取当前机器可用线程数)
    16         System.out.println("我的机器可用Processors数量:" + Runtime.getRuntime().availableProcessors());
    17         Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
    18 
    19         Random r = new Random();
    20         for (int i = 1; i <= 10; i++) {
    21             Task t = new Task();
    22             t.setId(i);
    23             t.setName("任务" + i);
    24             t.setPrice(r.nextInt(1000));
    25             master.submit(t);
    26         }
    27         master.execute();
    28 
    29         long start = System.currentTimeMillis();
    30 
    31         while (true) {
    32             if (master.isComplete()) {
    33                 long end = System.currentTimeMillis() - start;
    34                 int ret = master.getResult();
    35                 System.out.println("最终的结果:" + ret + ",执行耗时:" + end);
    36                 break;
    37             }
    38         }
    39     }
    40 
    41 }

    程序输出:

    我的机器可用Processors数量:20
    最终的结果:4473,执行耗时:500

    从上面的运行结果来看,程序最终执行时间几乎就等于一个线程单独运行的时间,在此注意的是,同时执行的线程数是根据你执行此程序的机器配置决定的。

  • 相关阅读:
    centos7 rabbitmq系统部署
    socket粘包、断包、校验
    C#对象、文件与二进制串(byte数组)之间的转换
    Windows Error Code
    C#之Socket断线和重连
    BitConverter 整数和十六进制互转
    DateTime还是DateTimeOffset?Now还是UtcNow?
    WebAPI 跨域
    Console Owin 跨域解决
    2019.12.17 Arcgis10.1许可到期解决方法
  • 原文地址:https://www.cnblogs.com/Dylansuns/p/6847853.html
Copyright © 2011-2022 走看看