zoukankan      html  css  js  c++  java
  • 简易的Master-Worker框架

    Master-Worker模式是一种使用多线程进行数据处理的结构,多个worker进程协作处理用户请求,master进程负责维护worker进程,并整合最终处理结果

    主要参与者

    • Worker:用于实际处理一个任务
    • Master:用于任务的分配和最终结果的合成
    • Main:启动系统,调度开启Master
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedDeque;
    
    /**
     * Master进程负责接收和分配任务
     */
    public class Master {
        //子任务队列   ConcurrentLinkedDeque:双向链表结构的无界并发队列
        protected Queue<Object> workQueue = new ConcurrentLinkedDeque<>();
        //worker进程队列
        protected Map<String,Thread> threadMap = new HashMap<>();
        //子任务处理结果集
        protected Map<String,Object> resultMap = new ConcurrentHashMap<>();
        //是否所以的子任务都结束了
        public boolean isComplete(){
            for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
                //停止状态
                if (entry.getValue().getState()!= Thread.State.TERMINATED){
                    return false;
                }
            }
            return true;
        }
    
        // Worker进程,Worker进程数量
        public Master(Worker worker, int countWorker) {
            worker.setWorkQueue(workQueue);
            worker.setResultMap(resultMap);
            for (int i = 0; i < countWorker; i++) {
                threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
            }
        }
        //提交一个任务
        public void submit(Object job){
            workQueue.add(job);
        }
    
        //返回子任务结果集
        public Map<String, Object> getResultMap() {
            return resultMap;
        }
        //运行所以Worker进程,进行处理
        public void execute(){
            for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
                entry.getValue().start();
            }
        }
    }
    
    
    import java.util.Map;
    import java.util.Queue;
    
    /**
     * Worker进程负责处理子任务
     */
    public class Worker implements Runnable {
        //子任务队列
        protected Queue<Object> workQueue;
        //子任务处理结果集
        protected Map<String,Object> resultMap;
        public void setWorkQueue(Queue<Object> workQueue) {
            this.workQueue = workQueue;
        }
        public void setResultMap(Map<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
        //子任务处理的逻辑,在子类中实现具体逻辑
        public Object handle(Object input){
            return input;
        }
        @Override
        public void run() {
            while (true){
                //获取子任务
                Object input = workQueue.poll();
                if (input==null) break;
                //处理子任务
                Object re = handle(input);
                //将处理结果写入结果集
                resultMap.put(Integer.toString(input.hashCode()),re);
            }
        }
    }
    
    
    /**
     * 任务 求 i^2
     */
    public class PlusWorker extends Worker {
        public Object handle(Object input){
            Integer i = (Integer) input;
            return i*i;
        }
    }
    
    
    import java.util.Map;
    import java.util.Set;
    
    /**
     * 求 1^2 + 2^2 + 3^2 + 4^2 + 5^2
     * 1 + 4 + 9 + 16 + 25 = 55
     */
    public class Main {
        public static void main(String[] args){
            Master m = new Master(new PlusWorker(),5);
            for (int i = 1; i <= 5; i++) {
                m.submit(i);
            }
            m.execute();
            int re = 0;
            Map<String, Object> resultMap = m.getResultMap();
            // 任务结果相加
            while (resultMap.size()>0||!m.isComplete()){
                Set<String> keys = resultMap.keySet();
                String key = null;
                for (String k : keys) { //每次只取一次
                    key = k;
                    break;
                }
                Integer i = null;
                if (key!=null){
                    i = (Integer) resultMap.get(key);
                }
                if (i!=null){
                    re+=i; //任务结果相加
                }
                if (key!=null){
                    resultMap.remove(key); //移除已经被计算的结果项
                }
            }
            System.out.println(re); //55
        }
    }
    
  • 相关阅读:
    ant build打包
    在JAVA中如何获取当前源文件名以及代码的行号
    react以组件为中心的代码分割和懒加载
    java中针对 try和finally一些总结
    JS强制关闭浏览器页签并且不提示关闭信息
    由[].slice.call()引发的思考
    JS类型判断
    nginx的location配置
    DBCP连接池
    java/Servlet
  • 原文地址:https://www.cnblogs.com/fly-book/p/11437919.html
Copyright © 2011-2022 走看看