zoukankan      html  css  js  c++  java
  • java多线程系列14 设计模式 Master-Worker

    Master-Worker模式是常用的并行设计模式可以将大任务划分为小任务,是一种分而治之的设计理念

    系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理任务  。子进程处理完成以后,会把结果返回给Master

    原理图如下

     

    下面代码演示 

    public abstract class Worker implements Runnable {
    	// 任务队列,用于取得子任务
    	protected Queue<Object> workQueue;
    	// 子任务处理结果集
    	protected Map<String, Object> resultMap;
    
    	public void setWorkQueue(Queue<Object> workQueue) {
    		this.workQueue = workQueue;
    	}
    
    	public void setResultMap(Map<String, Object> resultMap) {
    		this.resultMap = resultMap;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			Object object = workQueue.poll();  //因为当空的时候 会返回空 
    		    if(object==null){  
                    break;  
                }  
    			Object rtn = handle(object);
    			resultMap.put(rtn.hashCode() + "", rtn);
    		}
    
    	}
        //具体的执行逻辑 交给子类
    	public abstract Object handle(Object input) ;
    
    }
    

      

    public class PlusWorker extends Worker {
    	   @Override  
    	    public Object handle(Object input) {  
    	        Integer i =(Integer)input;  
    	        return i;  
    	    }  
    }
    

      

    public class Master {
    
    	// 存放提交的任务
    	Queue<Object> planQueue = new ConcurrentLinkedQueue<>();
    	// 处理任务的线程
    	Map<String, Thread> threads = new HashMap<String, Thread>();
    	// 存储每个任务的完成结果
    	private Map<String, Object> result = new ConcurrentHashMap<String, Object>();
    
    	public Master(Worker w, int workerNum) {
    		if (workerNum <= 0) {
    			throw new RuntimeException("workerNum 不能《=0");
    		}
    		w.setResultMap(result);
    		w.setWorkQueue(planQueue);
    		for (int i = 0; i < workerNum; i++) {
    			threads.put(i + "", new Thread(w, i + ""));
    		}
    	}
    
    	// 添加任务的方法
    	public void submit(Object obj) {
    		planQueue.add(obj);
    	}
    
    	public void execute() {
    
    		for (Map.Entry<String, Thread> entry : threads.entrySet()) {
    			entry.getValue().start();
    		}
    	}
    
    	public Map<String, Object> getResMap() {
    		return result;
    	}
    
    	// 判断所有工作线程是否执行完毕
    	public boolean isComplete() {
    		for (Map.Entry<String, Thread> entry : threads.entrySet()) {
    			if (entry.getValue().getState() != Thread.State.TERMINATED) {
    				return false;
    			}
    		}
    		return true;
    	}
    
    	// 返回结果集
    	public Object getRes() {
    		Integer re = 0;
    		Map<String, Object> resultMap = result;
    		for (String string : resultMap.keySet()) {
    			Integer ob = (Integer) resultMap.get(string);
    			re = ob + re;
    		}
    		return re;
    	}
    }
    

      测试类

         

    public class Main {
    public static void main(String[] args) {
    //	long now = System.currentTimeMillis();
    	Master master = new Master(new PlusWorker(), 1);
    	for (int i = 1; i <= 1000; i++) {
    		master.submit(i);
    	}
    
    	master.execute();
         //保存最终结算结果 
         while(true)
         {
        	 if(master.isComplete()){
        		 System.out.println(master.getRes());
        		 break;
        	 }
        		 
         }
    	
    }
    }
    

      

    注意::::重量级的工作配合多核cpu效果不错,轻量级的程序单线程工作效果不一定比并发的差.

  • 相关阅读:
    移动端适配方案
    js基础知识复习
    js单线程的本质-------Event Loop
    第13课 字典
    第12课 习题讲解
    第11课 循环嵌套和算法
    第10课 文件的读写
    第9课 循环语句与注释
    第8课 对象的方法
    第7课 初识函数
  • 原文地址:https://www.cnblogs.com/javabigdata/p/7010278.html
Copyright © 2011-2022 走看看