zoukankan      html  css  js  c++  java
  • Master和worker模式

    让和hadoop的设计思想是一样的,Master负责分配任务和获取任务的结果,worker是真正处理业务逻辑的。

    使用ConcurrentLikedQueue去承载所有的任务,因为会有多个worker会并发修改这个队列。

    public class Task {
    
        private int id;
        private int price ;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public int getPrice() {
            return price;
        }
        public void setPrice(int price) {
            this.price = price;
        } 
        
    }
    public class Master {
    
        //1 有一个盛放任务的容器
        private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
        
        //2 需要有一个盛放worker的集合
        private HashMap<String, Thread> workers = new HashMap<String, Thread>();
        
        //3 需要有一个盛放每一个worker执行任务的结果集合
        private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
        
        //4 构造方法
        public Master(Worker worker , int workerCount){
            worker.setWorkQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            
            for(int i = 0; i < workerCount; i ++){
                this.workers.put(Integer.toString(i), new Thread(worker));
            }
            
        }
        
        //5 需要一个提交任务的方法
        public void submit(Task task){
            this.workQueue.add(task);
        }
        
        //6 需要有一个执行的方法,启动所有的worker方法去执行任务
        public void execute(){
            for(Map.Entry<String, Thread> me : workers.entrySet()){
                me.getValue().start();
            }
        }
    
        //7 判断是否运行结束的方法
        public boolean isComplete() {
            for(Map.Entry<String, Thread> me : workers.entrySet()){
                if(me.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
            }        
            return true;
        }
    
        //8 计算结果方法
        public int getResult() {
            int priceResult = 0;
            for(Map.Entry<String, Object> me : resultMap.entrySet()){
                priceResult += (Integer)me.getValue();
            }
            return priceResult;
        }
    }
    public class Worker implements Runnable {
    
        private ConcurrentLinkedQueue<Task> workQueue;
        private ConcurrentHashMap<String, Object> resultMap;
        
        public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }
    
        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
        
        @Override
        public void run() {
            while(true){
                Task input = this.workQueue.poll();
                if(input == null) break;
                Object output = handle(input);
                this.resultMap.put(Integer.toString(input.getId()), output);
            }
        }
    
        private Object handle(Task input) {
            Object output = null;
            try {
                //处理任务的耗时。。 比如说进行操作数据库。。。
                Thread.sleep(500);
                output = input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    
    
    
    }
    public class Main {
    
        public static void main(String[] args) {
            
            Master master = new Master(new Worker(), 20);
            
            Random r = new Random();
            for(int i = 1; i <= 100; i++){
                Task t = new Task();
                t.setId(i);
                t.setPrice(r.nextInt(1000));
                master.submit(t);
            }
            master.execute();
            long start = System.currentTimeMillis();
            
            while(true){
                if(master.isComplete()){
                    long end = System.currentTimeMillis() - start;
                    int priceResult = master.getResult();
                    System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
                    break;
                }
            }
            
        }
    }
  • 相关阅读:
    C++ 从文件中读取数据的代码及优化
    EOF在C++中的含义
    利用"中值滤波原理"过滤异常数据(转载)
    Hive差集运算详解
    H5 玩出新花样(一) -- 森林领导术不倒流
    C++中怎么暂停几秒
    c语言中 srand(time(NULL)); 这句话是什么意思(尤其是 NULL)
    C++ 码代码的风格(推荐)
    c/c++编译器的安装
    jquery的$(selector).each,$.each的区别
  • 原文地址:https://www.cnblogs.com/dongdone/p/5748772.html
Copyright © 2011-2022 走看看