Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。
一、什么是Master-Worker模式:
该模式的结构图:
Worker:用于实际处理一个任务;
Master:任务的分配和最终结果的合成;
Main:启动程序,调度开启Master。
注意点:
Master必须存在一个队列来存储客户端发送过来的任务,必须有一个队列,我们选择无阻塞无界队列
Worker实际处理任务的操作者,所以应该是线程,应该实现runnable接口
master应该有一个容器装所有的worker对象,不涉及到高并发,使用hashmap ,value就是worker对象
worker需要拥有master进程中的无阻塞无界队列的引用,用来获得任务进行处理
master是对worker处理的结果做归纳总结,所以需要有一个队列要保存worker处理的结果,要实现并发安全的ConcurrentHashMap
worker需要拥有ConcurrentHashMap的引用,把处理的结果存储在ConcurrentHashMap中,需要线程安全的
Master-Worker模式是一种将串行任务并行化的方案。
更重要的是提供了一种变化的思想。
我们来看下面程序的代码:
package com.bjsxt.height.design015; public class Task { private int id; private int price ; public int getId() { return id; } public void setId(int id) { this.id = id; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } }
package com.bjsxt.height.design015; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String, Object> resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true){ Task input = this.workQueue.poll(); if(input == null) break; Object output = handle(input); this.resultMap.put(Integer.toString(input.getId()), output); } } private Object handle(Task input) { Object output = null; try { //处理任务的耗时。。 比如说进行操作数据库。。。 Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } return output; } }
package com.bjsxt.height.design015; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //1 有一个盛放任务的容器 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //2 需要有一个盛放worker的集合 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //3 需要有一个盛放每一个worker执行任务的结果集合 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //4 构造方法 public Master(Worker worker , int workerCount){ worker.setWorkQueue(this.workQueue); worker.setResultMap(this.resultMap); /** * 初始化woker对象的个数,管理worker对象,开启100个worker对象,装到hashmap中 * */ for(int i = 0; i < workerCount; i ++){ this.workers.put(Integer.toString(i), new Thread(worker)); } } //5 需要一个提交任务的方法 public void submit(Task task){ /** * 添加到任务队列中 * */ this.workQueue.add(task); } //6 需要有一个执行的方法,启动所有的worker方法去执行任务 public void execute(){ for(Map.Entry<String, Thread> me : workers.entrySet()){ me.getValue().start(); } } //7 判断是否运行结束的方法 public boolean isComplete() { for(Map.Entry<String, Thread> me : workers.entrySet()){ if(me.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } //8 计算结果方法 public int getResult() { int priceResult = 0; for(Map.Entry<String, Object> me : resultMap.entrySet()){ priceResult += (Integer)me.getValue(); } return priceResult; } }
package com.bjsxt.height.design015; import java.util.Random; public class Main { public static void main(String[] args) { Master master = new Master(new Worker(), 20); Random r = new Random(); for(int i = 1; i <= 100; i++){ Task t = new Task(); t.setId(i); t.setPrice(r.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); while(true){ if(master.isComplete()){ long end = System.currentTimeMillis() - start; int priceResult = master.getResult(); System.out.println("最终结果:" + priceResult + ", 执行时间:" + end); break; } } } }
程序运行的结果是:
最终结果:54386, 执行时间:2500
将每个worker计算得到的price结果进行相加
相当的经典,这种设计模式就是将串行化的思想转换成并行化进行处理