Master-worker模式是常用的并行计算模式。它的核心思想是系统是由两类进程协助工作:Master进行和worker进程。Master负责接收和分配任务,worker负责处理子任务。当各个worker子进程处理完成后,会返回结果给master,由master做归纳和总结。其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。
代码实现:
Master:
1 package com.java.day04_mode_masterworker; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.ConcurrentHashMap; 6 import java.util.concurrent.ConcurrentLinkedQueue; 7 8 public class Master { 9 10 //1.首先有承装任务的容器 11 private ConcurrentLinkedQueue<Task> workQueue= new ConcurrentLinkedQueue<>(); 12 13 //2.有承装worker的容器 14 private HashMap<String, Thread> workers = new HashMap<>(); 15 16 //3.承装worker结果集的容器 17 private ConcurrentHashMap<String , Object> results = new ConcurrentHashMap<>(); 18 19 //4.构造方法 20 public Master(Worker worker,int workerCount){ 21 worker.setWorkQueue(workQueue); 22 worker.setResults(results); 23 24 for (int i = 0; i < workerCount; i++) { 25 workers.put("节点"+Integer.toString(i), new Thread(worker)); 26 } 27 } 28 29 //5.往任务队列里提交任务的方法 30 public void submit(Task task){ 31 this.workQueue.add(task); 32 } 33 34 //6.启动应用程序,让所有的worker工作 35 public void excute(){ 36 //map的循环方式 37 for(Map.Entry<String, Thread> me : workers.entrySet()){ 38 me.getValue().start(); 39 } 40 } 41 42 //判断线程是否执行完毕:当所有的任务执行完毕,线程也停止 43 public boolean isComplete() { 44 45 for(Map.Entry<String, Thread> me : workers.entrySet()){ 46 if(me.getValue().getState() != Thread.State.TERMINATED){ 47 return false; 48 } 49 } 50 51 return true; 52 } 53 54 //返回结果集合 55 public long getResult() { 56 long result = 0; 57 58 for(Map.Entry<String, Object> me : results.entrySet()){ 59 result+=(Integer)me.getValue(); 60 } 61 return result; 62 } 63 64 65 66 }
Worker:
1 package com.java.day04_mode_masterworker; 2 3 import java.util.HashMap; 4 import java.util.concurrent.ConcurrentHashMap; 5 import java.util.concurrent.ConcurrentLinkedQueue; 6 7 public class Worker implements Runnable{ 8 9 private ConcurrentLinkedQueue<Task> workQueue; 10 private ConcurrentHashMap<String, Object> results; 11 12 13 @Override 14 public void run() { 15 while(true){ 16 Task input = workQueue.poll(); 17 //任务执行完之后,break,线程停止 18 if(input == null) break; 19 Object output = handle(input); 20 21 results.put(Integer.toString( input.getId()), output); 22 } 23 24 } 25 26 //可以把下面的方法提取出来,在worker里面返回空,然后写一个子类继承worker,在worker里面重写此方法,解耦,灵活度也更高,Object output = 子类.handle(input); 27 //对数据的具体操作 28 private Object handle(Task input) { 29 Object result = null; 30 31 try { 32 Thread.sleep(500); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 37 result=input.getPrice(); 38 39 40 return result; 41 } 42 43 public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { 44 this.workQueue=workQueue; 45 } 46 47 public void setResults(ConcurrentHashMap<String, Object> results) { 48 this.results=results; 49 } 50 51 52 53 }
Task:
1 package com.java.day04_mode_masterworker; 2 3 public class Task { 4 private int id; 5 private String name; 6 private int price; 7 public int getId() { 8 return id; 9 } 10 public void setId(int id) { 11 this.id = id; 12 } 13 public String getName() { 14 return name; 15 } 16 public void setName(String name) { 17 this.name = name; 18 } 19 public int getPrice() { 20 return price; 21 } 22 public void setPrice(int price) { 23 this.price = price; 24 } 25 26 27 }
Main:
1 package com.java.day04_mode_masterworker; 2 3 public class Main { 4 5 public static void main(String[] args) { 6 7 //这一块的10(线程数)要根据机器具体的性能来 8 //Runtime.getRuntime().availableProcessors():我机器可以用的process数量 9 10 //创建master,和所需要的worker 11 Master master = new Master(new Worker(),10); 12 13 //提交任务 14 for (int i = 1; i < 101; i++) { 15 Task t = new Task(); 16 t.setId(i); 17 t.setName("任务"+i); 18 t.setPrice(i+1000); 19 20 master.submit(t); 21 } 22 23 //执行任务 24 master.excute(); 25 26 long start = System.currentTimeMillis(); 27 28 //判断任务是否执行完毕 29 while(true){ 30 if(master.isComplete()){ 31 long end = System.currentTimeMillis()-start; 32 //任务执行完成,返回结果集 33 long result = master.getResult(); 34 System.out.println("执行了"+end+"秒,最终结果为:"+result); 35 break; 36 } 37 } 38 39 40 41 42 43 44 } 45 46 47 }
其中上面的代码还可以进行优化