zoukankan      html  css  js  c++  java
  • Master-Worker集群计算demo

    Task为要执行的任务实体类:

    package com.viewhigh.mdop.bi.test;
    
    /**
     * Created by zzq on 2017/5/11.
     */
    public class Task {
        private int id;
        private String name;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public Task setName(String name) {
            this.name = name;
            return this;
        }
    }

    Master为分布式计算代理类,负责创建多个工作线程来处理任务,并将结果汇总,内部维护任务队列,结果map集合和线程map集合:

    package com.viewhigh.mdop.bi.test;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * Created by zzq on 2017/5/11.
     */
    public class Master {
        //任务队列
        private Queue<Task> workerQueue = new LinkedBlockingQueue();
    
        //工作线程集合
        private Map<Integer, Thread> workerMap = new HashMap();
    
        //工作线程返回结果
        private Map<Integer, Object> workerResult = new ConcurrentHashMap();
    
        public Map<Integer, Object> getWorkerResult() {
            return workerResult;
        }
    
        public Master(Worker worker) {
            int currProcessors = Runtime.getRuntime().availableProcessors();
            System.out.println("当前机器最大线程数:" + currProcessors);
    
            worker.setWorkerQueue(workerQueue);
            worker.setWorkerResultMap(workerResult);
    
            for (int i = 0; i < currProcessors; i++) {
                workerMap.put(i, new Thread(worker));
            }
        }
    
        public void submitTask(Task task) {
            workerQueue.add(task);
        }
    
        public void submitTask(List<Task> taskList) {
            for (Task task : taskList)
                workerQueue.add(task);
        }
    
        public void execute() {
            for (Thread thread : workerMap.values()) {
                thread.start();
            }
        }
    
        public boolean finish() {
            for (Thread thread : workerMap.values()) {
                if (thread.getState() != Thread.State.TERMINATED)//线程未终止
                    return false;
            }
            return true;
        }
    }

    Worker承担计算和计算结果汇总,处理队列中的Task:

    package com.viewhigh.mdop.bi.test;
    
    import java.util.Map;
    import java.util.Queue;
    
    /**
     * Created by zzq on 2017/5/11.
     */
    public class Worker implements Runnable {
    
        private Queue<Task> workerQueue;
        private Map<Integer, Object> workerResult;
    
        @Override
        public void run() {
            while (true) {
                Task task = workerQueue.poll();
    
                if (task == null) break;
    
                Object ret = handle(task);
                workerResult.put(task.getId(), ret);
            }
        }
    
        public Object handle(Task task) {
            try {
                Thread.sleep(1000);
                return task.getName();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public void setWorkerQueue(Queue<Task> workerQueue) {
            this.workerQueue = workerQueue;
        }
    
        public void setWorkerResultMap(Map<Integer, Object> workerResult) {
            this.workerResult = workerResult;
        }
    }

    测试类:

    package com.viewhigh.mdop.bi.test;
    
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static org.junit.Assert.*;
    
    /**
     * Created by zzq on 2017/5/11.
     */
    public class MasterTest {
        @Test
        public void submitTask() throws Exception {
            List<Task> list = new ArrayList();
    
            for (int i = 0; i < 10; i++) {
                Task task1 = new Task();
                task1.setId(i);
                task1.setName("_序号_:" + Integer.toString(i));
                list.add(task1);
            }
    
            Master master = new Master(new Worker());
            master.submitTask(list);
    
            master.execute();
            long currTime = System.currentTimeMillis();
    
            while (!master.finish()) {
    //            System.out.println("执行中。。。");
            }
    
    //        for (Object a : master.getWorkerResult().values())
    //            System.out.println((String) a);
            System.out.println("消耗的时间:" + Long.toString(System.currentTimeMillis() - currTime));
        }
    
    
    }
  • 相关阅读:
    我是5型
    现在的我,有两个状态。我要去找第三个
    什么是BNF范式,什么又是EBNF范式?
    又是好久不写日志
    语料库资源汇总
    原生js与css3结合的电风扇
    JavaScript基础2
    JavaScript基础1
    JavaScript基础4
    JavaScript基础3
  • 原文地址:https://www.cnblogs.com/zzq-include/p/6842787.html
Copyright © 2011-2022 走看看