Task为要执行的任务实体类:
package com.viewhigh.mdop.bi.test; /** * Created by zzq on 2017/5/11. */ public class Task { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public Task setName(String name) { this.name = name; return this; } }
Master为分布式计算代理类,负责创建多个工作线程来处理任务,并将结果汇总,内部维护任务队列,结果map集合和线程map集合:
package com.viewhigh.mdop.bi.test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; /** * Created by zzq on 2017/5/11. */ public class Master { //任务队列 private Queue<Task> workerQueue = new LinkedBlockingQueue(); //工作线程集合 private Map<Integer, Thread> workerMap = new HashMap(); //工作线程返回结果 private Map<Integer, Object> workerResult = new ConcurrentHashMap(); public Map<Integer, Object> getWorkerResult() { return workerResult; } public Master(Worker worker) { int currProcessors = Runtime.getRuntime().availableProcessors(); System.out.println("当前机器最大线程数:" + currProcessors); worker.setWorkerQueue(workerQueue); worker.setWorkerResultMap(workerResult); for (int i = 0; i < currProcessors; i++) { workerMap.put(i, new Thread(worker)); } } public void submitTask(Task task) { workerQueue.add(task); } public void submitTask(List<Task> taskList) { for (Task task : taskList) workerQueue.add(task); } public void execute() { for (Thread thread : workerMap.values()) { thread.start(); } } public boolean finish() { for (Thread thread : workerMap.values()) { if (thread.getState() != Thread.State.TERMINATED)//线程未终止 return false; } return true; } }
Worker承担计算和计算结果汇总,处理队列中的Task:
package com.viewhigh.mdop.bi.test; import java.util.Map; import java.util.Queue; /** * Created by zzq on 2017/5/11. */ public class Worker implements Runnable { private Queue<Task> workerQueue; private Map<Integer, Object> workerResult; @Override public void run() { while (true) { Task task = workerQueue.poll(); if (task == null) break; Object ret = handle(task); workerResult.put(task.getId(), ret); } } public Object handle(Task task) { try { Thread.sleep(1000); return task.getName(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } public void setWorkerQueue(Queue<Task> workerQueue) { this.workerQueue = workerQueue; } public void setWorkerResultMap(Map<Integer, Object> workerResult) { this.workerResult = workerResult; } }
测试类:
package com.viewhigh.mdop.bi.test; import org.junit.Test; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.*; /** * Created by zzq on 2017/5/11. */ public class MasterTest { @Test public void submitTask() throws Exception { List<Task> list = new ArrayList(); for (int i = 0; i < 10; i++) { Task task1 = new Task(); task1.setId(i); task1.setName("_序号_:" + Integer.toString(i)); list.add(task1); } Master master = new Master(new Worker()); master.submitTask(list); master.execute(); long currTime = System.currentTimeMillis(); while (!master.finish()) { // System.out.println("执行中。。。"); } // for (Object a : master.getWorkerResult().values()) // System.out.println((String) a); System.out.println("消耗的时间:" + Long.toString(System.currentTimeMillis() - currTime)); } }