zoukankan      html  css  js  c++  java
  • 并发处理之master-worker 模式

    master-worker模式是一种将顺序执行的任务转为并发执行,顺序执行的任务之间相互之间没有关系

    如图:

    相关代码实现简易版:

    1)master 实现

    package com.lwd.worker_master;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     *  分配任务/合并结果集
     * @author liuwd
     */
    public class Master {
        /**
         * 任务队列
         */
        private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        /**
         * 工作进程
         */
        private Map<String,Thread> threadMap = new HashMap<>(16);
        /**
         * 子任务处理结果集
         */
        private Map<String,Object> resultMap = new HashMap<>(16);
    
        public Master(Worker worker,int count){
            worker.setWorkerQueue(queue);
            worker.setResultMap(resultMap);
            for (int i = 0; i < count; i++) {
                threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
            }
        }
    
    
        /**
         * 是否子任务都结束了
         * @return
         */
        public boolean isComplte(){
            Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
            for (Map.Entry<String, Thread> entry : entries) {
                Thread thread = entry.getValue();
                if(thread.getState()!=Thread.State.TERMINATED){
                    return false;
                }
            }
            return true;
        }
    
        /**
         * 提交任务
         * @param obj
         */
        public void submit(Object obj){
           queue.add(obj);
        }
    
        /**
         *  返回结果集
         * @return
         */
        public Map<String,Object> getResultMap(){
            return resultMap;
        }
    
        /**
         * 执行任务 开启进程
         */
        public void execute(){
            Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
            for (Map.Entry<String, Thread> entry : entries) {
                entry.getValue().start();
            }
        }
    
    }

    2)worker实现

    package com.lwd.worker_master;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     * 任务对象 用于处理相关任务
     * @author liuwd
     */
    public class Worker implements Runnable{
        /**
         * 任务队列
         */
        private ConcurrentLinkedQueue workerQueue;
        /**
         * 结果集
         */
        private Map<String,Object>  resultMap;
    
    
        public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
            this.workerQueue = workerQueue;
        }
    
    
        public void setResultMap(Map<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    
        @Override
        public void run() {
            while (true){
                Object poll = workerQueue.poll();
                if(null == poll){
                    break;
                }
                Object handle = handle(poll);
                resultMap.put(Integer.toString(handle.hashCode()),handle);
            }
        }
    
        /**
         * 处理任务
         * @param obj
         * @return
         */
        public Object handle(Object obj){
            return obj;
        }
    }

    3)RealWork实现

    package com.lwd.worker_master;
    
    /**
     *  实际任务类
     * @author liuwd
     */
    public class RealWorker extends Worker {
        @Override
        public Object handle(Object obj) {
            Integer i = (Integer)obj;
            return i*i;
        }
    }

    4)WorkMasterMain.java 需求运行实现

    package com.lwd.worker_master;
    
    import java.util.Iterator;
    import java.util.Map;
    
    
    /**
     *  当前模式的使用主体类
     * @author liuwd
     */
    public class WokerMasterMain {
    
        public static void main(String[] args) {
            Master master = new Master(new RealWorker(), 5);
            Integer integer = squaresSum(master, 100);
            System.out.println(integer);
    
        }
    
        /**
         * 1-100平方和
         */
        public static Integer squaresSum(Master master,int num){
            for (int i = 0; i <num ; i++) {
                master.submit(i);
            }
            master.execute();
            int result = 0;
            Map<String, Object> resultMap = master.getResultMap();
            while (resultMap.size()>0&&!master.isComplte()){
                Iterator<String> iterator = resultMap.keySet().iterator();
                while (iterator.hasNext()){
                    String key = iterator.next();
                    Object o = resultMap.get(key);
                    if(null != o){
                        Integer i = (Integer)o;
                        result+=i;
                    }
                    iterator.remove();
                }
    
            }
            return result;
        }
    }
  • 相关阅读:
    带你进入异步Django+Vue的世界
    xps转换为pdf
    当对函数的返回值有多种需求时(执行是否成功,及业务数据的返回值),可采用的方法
    WPF 打印崩溃问题( 异常:Illegal characters in path/路径中有非法字符)
    集群、限流、缓存 BAT 大厂无非也就是这么做
    C#简单爬取数据(.NET使用HTML解析器ESoup和正则两种方式匹配数据)
    公共静态函数、属性 的 “关联成员” 的 生命周期
    python 之 Django框架(ORM常用字段和字段参数、关系字段和和字段参数)
    Django文档
    go micro 微服务框架温习
  • 原文地址:https://www.cnblogs.com/lwdmaib/p/13781296.html
Copyright © 2011-2022 走看看