zoukankan      html  css  js  c++  java
  • 并发处理之master-worker 模式

    master-worker模式是一种将顺序执行的任务转为并发执行,顺序执行的任务之间相互之间没有关系

    如图:

    相关代码实现简易版:

    1)master 实现

    package com.lwd.worker_master;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     *  分配任务/合并结果集
     * @author liuwd
     */
    public class Master {
        /**
         * 任务队列
         */
        private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
        /**
         * 工作进程
         */
        private Map<String,Thread> threadMap = new HashMap<>(16);
        /**
         * 子任务处理结果集
         */
        private Map<String,Object> resultMap = new HashMap<>(16);
    
        public Master(Worker worker,int count){
            worker.setWorkerQueue(queue);
            worker.setResultMap(resultMap);
            for (int i = 0; i < count; i++) {
                threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
            }
        }
    
    
        /**
         * 是否子任务都结束了
         * @return
         */
        public boolean isComplte(){
            Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
            for (Map.Entry<String, Thread> entry : entries) {
                Thread thread = entry.getValue();
                if(thread.getState()!=Thread.State.TERMINATED){
                    return false;
                }
            }
            return true;
        }
    
        /**
         * 提交任务
         * @param obj
         */
        public void submit(Object obj){
           queue.add(obj);
        }
    
        /**
         *  返回结果集
         * @return
         */
        public Map<String,Object> getResultMap(){
            return resultMap;
        }
    
        /**
         * 执行任务 开启进程
         */
        public void execute(){
            Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
            for (Map.Entry<String, Thread> entry : entries) {
                entry.getValue().start();
            }
        }
    
    }

    2)worker实现

    package com.lwd.worker_master;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     * 任务对象 用于处理相关任务
     * @author liuwd
     */
    public class Worker implements Runnable{
        /**
         * 任务队列
         */
        private ConcurrentLinkedQueue workerQueue;
        /**
         * 结果集
         */
        private Map<String,Object>  resultMap;
    
    
        public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
            this.workerQueue = workerQueue;
        }
    
    
        public void setResultMap(Map<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    
        @Override
        public void run() {
            while (true){
                Object poll = workerQueue.poll();
                if(null == poll){
                    break;
                }
                Object handle = handle(poll);
                resultMap.put(Integer.toString(handle.hashCode()),handle);
            }
        }
    
        /**
         * 处理任务
         * @param obj
         * @return
         */
        public Object handle(Object obj){
            return obj;
        }
    }

    3)RealWork实现

    package com.lwd.worker_master;
    
    /**
     *  实际任务类
     * @author liuwd
     */
    public class RealWorker extends Worker {
        @Override
        public Object handle(Object obj) {
            Integer i = (Integer)obj;
            return i*i;
        }
    }

    4)WorkMasterMain.java 需求运行实现

    package com.lwd.worker_master;
    
    import java.util.Iterator;
    import java.util.Map;
    
    
    /**
     *  当前模式的使用主体类
     * @author liuwd
     */
    public class WokerMasterMain {
    
        public static void main(String[] args) {
            Master master = new Master(new RealWorker(), 5);
            Integer integer = squaresSum(master, 100);
            System.out.println(integer);
    
        }
    
        /**
         * 1-100平方和
         */
        public static Integer squaresSum(Master master,int num){
            for (int i = 0; i <num ; i++) {
                master.submit(i);
            }
            master.execute();
            int result = 0;
            Map<String, Object> resultMap = master.getResultMap();
            while (resultMap.size()>0&&!master.isComplte()){
                Iterator<String> iterator = resultMap.keySet().iterator();
                while (iterator.hasNext()){
                    String key = iterator.next();
                    Object o = resultMap.get(key);
                    if(null != o){
                        Integer i = (Integer)o;
                        result+=i;
                    }
                    iterator.remove();
                }
    
            }
            return result;
        }
    }
  • 相关阅读:
    spring mvc 返回json
    spring mvc 解决后台传递值乱码问题
    sring mvc 返回值至jsp界面的几种方式
    spring mvc 注解访问控制器以及接收form数据的方式,包括直接接收日期类型及对象的方法
    spring mvc 通过配置xml访问控制器的三种方式
    spring mvc 入门配置
    HBase 学习笔记
    Centos搭建Linux测试环境,几个基本的设置项
    kafka 集群安装与安装测试
    Kafka 分布式消息队列介绍
  • 原文地址:https://www.cnblogs.com/lwdmaib/p/13781296.html
Copyright © 2011-2022 走看看