一、Master-Worker设计模式
Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果返回给Master进程,由Master进行归纳和汇总,从而得到系统结果。
Master-Worker模式的好处是,它能将大任务分解成若干个小任务,并发执行,从而提高系统性能。而对于系统请求者Client来说,任务一旦提交,Master进程就会立刻分配任务并立即返回,并不会等系统处理完全部任务再返回,其处理过程是异步的。
二、Master-Worker设计模式代码实现
1、创建Task任务对象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 /** 4 * Created by Root on 5/12/2017. 5 */ 6 public class Task { 7 8 private int id; 9 10 private String name; 11 12 private int price; 13 14 public int getId() { 15 return id; 16 } 17 18 public void setId(int id) { 19 this.id = id; 20 } 21 22 public String getName() { 23 return name; 24 } 25 26 public void setName(String name) { 27 this.name = name; 28 } 29 30 public int getPrice() { 31 return price; 32 } 33 34 public void setPrice(int price) { 35 this.price = price; 36 } 37 }
2、实现Worker对象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.concurrent.ConcurrentHashMap; 4 import java.util.concurrent.ConcurrentLinkedQueue; 5 6 /** 7 * Created by Root on 5/12/2017. 8 */ 9 public class Worker implements Runnable { 10 11 private ConcurrentLinkedQueue<Task> workQueue; 12 private ConcurrentHashMap<String, Object> resultMap; 13 14 public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { 15 this.workQueue = workQueue; 16 } 17 18 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 19 this.resultMap = resultMap; 20 } 21 22 @Override 23 public void run() { 24 while (true) { 25 Task input = this.workQueue.poll(); 26 if (input == null) { 27 break; 28 } 29 // 真正的去做业务处理 30 //Object output = handle(input); 31 // 改造 32 Object output = MyWorker.handle(input); 33 // 返回处理结果集 34 this.resultMap.put(Integer.toString(input.getId()), output); 35 } 36 } 37 38 // private Object handle(Task input) { 39 // Object output = null; 40 // try { 41 // // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库...... 42 // Thread.sleep(500); 43 // output = input.getPrice(); 44 // } catch (InterruptedException e) { 45 // e.printStackTrace(); 46 // } 47 // return output; 48 // } 49 50 // 优化,考虑让继承类去自己实现具体的业务处理 51 public static Object handle(Task input) { 52 return null; 53 } 54 55 }
3、为了使程序更灵活,将具体的业务执行逻辑抽离,在具体的Worker对象去实现,如这里的MyWorker对象
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 /** 4 * Created by Root on 5/13/2017. 5 */ 6 public class MyWorker extends Worker { 7 8 public static Object handle(Task input) { 9 Object output = null; 10 try { 11 // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库...... 12 Thread.sleep(500); 13 output = input.getPrice(); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 return output; 18 } 19 20 }
4、Master类
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.ConcurrentHashMap; 6 import java.util.concurrent.ConcurrentLinkedQueue; 7 8 /** 9 * Created by Root on 5/12/2017. 10 */ 11 public class Master { 12 13 // 1、使用一个ConcurrentLinkedQueue集合来装载所有需要执行的任务 14 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 15 16 // 2、使用HashMap来装载所有的worker对象 17 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); 18 19 // 3、使用一个容器承装每一个worker并发执行任务的结果集 20 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); 21 22 // 4、构造方法 23 public Master(Worker worker, int workerCount) { 24 // 每一个worker对象都需要有Master的引用,workQueue用于任务的领取,resultMap用于任务的提交 25 worker.setWorkerQueue(this.workQueue); 26 worker.setResultMap(this.resultMap); 27 28 for (int i = 0; i < workerCount; i++) { 29 workers.put("子节点" + Integer.toString(i), new Thread(worker)); 30 } 31 } 32 33 // 5、提交方法 34 public void submit(Task task) { 35 this.workQueue.add(task); 36 } 37 38 // 6、需要有一个执行的方法(启动应用程序,让所有的worker工作) 39 public void execute() { 40 for (Map.Entry<String, Thread> me : workers.entrySet()) { 41 me.getValue().start(); 42 } 43 } 44 45 // 7、判断线程是否执行完毕 46 public boolean isComplete() { 47 for (Map.Entry<String, Thread> me : workers.entrySet()) { 48 // 判断所有的线程状态是否属于已停止状态 49 if (me.getValue().getState() != Thread.State.TERMINATED) { 50 return false; 51 } 52 } 53 return true; 54 } 55 56 // 8、返回结果集数据 57 public int getResult() { 58 int ret = 0; 59 for (Map.Entry<String, Object> me : resultMap.entrySet()) { 60 // 汇总逻辑 61 ret += (Integer) me.getValue(); 62 } 63 return ret; 64 } 65 66 }
5、测试,具体调用实现
1 package com.ietree.basicskill.mutilthread.designpattern.masterworker; 2 3 import java.util.Random; 4 5 /** 6 * Created by Root on 5/13/2017. 7 */ 8 public class MasterWorkerTest { 9 10 public static void main(String[] args) { 11 12 // Master master = new Master(new Worker(), 10); 13 // 改造 14 // Master master = new Master(new MyWorker(), 10); 15 // 改造(获取当前机器可用线程数) 16 System.out.println("我的机器可用Processors数量:" + Runtime.getRuntime().availableProcessors()); 17 Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); 18 19 Random r = new Random(); 20 for (int i = 1; i <= 10; i++) { 21 Task t = new Task(); 22 t.setId(i); 23 t.setName("任务" + i); 24 t.setPrice(r.nextInt(1000)); 25 master.submit(t); 26 } 27 master.execute(); 28 29 long start = System.currentTimeMillis(); 30 31 while (true) { 32 if (master.isComplete()) { 33 long end = System.currentTimeMillis() - start; 34 int ret = master.getResult(); 35 System.out.println("最终的结果:" + ret + ",执行耗时:" + end); 36 break; 37 } 38 } 39 } 40 41 }
程序输出:
我的机器可用Processors数量:20
最终的结果:4473,执行耗时:500
从上面的运行结果来看,程序最终执行时间几乎就等于一个线程单独运行的时间,在此注意的是,同时执行的线程数是根据你执行此程序的机器配置决定的。