zoukankan      html  css  js  c++  java
  • Java Master-Worker 模式实现

    Master-Worker模式简介

    Master-Worker模式是非常经典的常用的一个并行计算模式,它的核心思想是2类进程协作工作:Master进程和Worker进程。Master负责接收客户端请求,分配任务;Worker负责具体处理任务。当各个Worker处理完任务后,统一将结果返回给Master,由Master进行整理和总结。其好处是能够将一个大JOB分解成若干小JOB,并行执行,从而提高系统的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在这种Master-Worker工作模式;离线分布式计算框架Hadoop的JobTracker和TaskTracker,实时流计算框架Strom的Nimbus和Supervisor都涉及到这种思想。那么下面我们来具体分析下Java Master-Worker模式的实现。

    Master-Worker模式分析

    我们重点分析下Master,Worker这2个角色。

    Master

    Master需要接受Client端提交过来的任务Task,而且还得将Task分配给Worker进行处理,因此Master需要一个存储来存放Task。那么采用哪种存储集合呢?首先来说,需要支持并发的集合类,因为多个Worker间可能存在任务竞争,因此我们需要考虑java.util.concurrent包下的集合。这里可以考虑采用非阻塞的ConcurrentLinkedQueue。

    Master需要清楚的知道各个Woker的基本信息,如是否各个Worker都运行完毕,因此Master端需要保存Worker的信息,可以采用Map存储。

    由于最后各个Worker都会上报运行结果,Master端需要有一个存储结果的Map,可以采用支持并发的ConcurrentHashMap。

     

    Worker

    Worker需要持有Master端的任务Task集合的引用,因为Worker需要从里面拿取Task。

    同上,Worker需要持有Master端的存储结果的引用。

    我们可以进一步细化,Master/Worker应该提供什么操作?

    Master:

    1. 通过构造方法以初始化workers

    2. 应该提供submit(Task)方法接受Client端提交过来的任务

    3. start()让workers开始处理任务

    4. 提供isComplete()判断各个worker的状态,是否都处理完毕

    5. 提供getResult()给客户端返回结果

    Worker:

    1. Worker本质上就是Runnable,提供run()

    2. 负责处理业务逻辑的handle()

    Java Master-Worker代码实现

    Task

    public class Task {
    
        private long id;
        private String name;
    
        public Task(long id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public long getId() {
            return id;
        }
    
        public void setId(long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
    
    }

    Worker

    public class Worker implements Runnable {
    
        private long id;
        private String name;
    
        private ConcurrentLinkedQueue<Task> workQueue;
    
        private ConcurrentHashMap<Long,Object> results;
    
        public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }
    
        public void setResults(ConcurrentHashMap<Long, Object> results) {
            this.results = results;
        }
    
        public Worker(long id, String name) {
            this.id = id;
            this.name = name;
        }
    
        @Override
        public void run() {
    
            while(true){
    
                Task task = workQueue.poll();
    
                if(task == null){
                    break;
                }
    
                long start = System.currentTimeMillis();
                long result = handle(task);
    
                this.results.put(task.getId(),result);
    
                System.out.println(this.name + " handle " + task.getName() + " success . result is " 
            + result + " cost time : " + (System.currentTimeMillis() - start)); } } /** * 负责处理具体业务逻辑 * @param task * @return */ private long handle(Task task) { //这里只是模拟下,在真实环境也许是查询数据库,也许是查缓存等 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return new Random().nextLong(); } }

    Master

    public class Master {
    
        private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
    
        private Map<Long,Thread> workers = new HashMap<Long, Thread>();
    
        private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>();
    
        public Master(int num){
    
            for(int i = 0 ; i < num ; i++){
    
                Worker worker = new Worker(i,"worker-" + i);
                worker.setResults(results);
                worker.setWorkQueue(workQueue);
    
                workers.put(Long.valueOf(i),new Thread(worker));
            }
    
        }
    
        public void submit(Task task){
            workQueue.add(task);
        }
    
        public void start(){
    
            for (Map.Entry<Long,Thread> entry : workers.entrySet()){
    
                entry.getValue().start();
            }
    
        }
    
        public boolean isComlepte(){
    
            for(Map.Entry<Long,Thread> entry : workers.entrySet()){
    
                if(entry.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
    
            }
    
            return true;
        }
    
        public long getSumResult(){
    
            long value = 0;
            for(Map.Entry<Long,Object> entry : results.entrySet()){
    
                value = value + (Long)entry.getValue();
    
            }
            return value;
        }
    }

    Main

    public class Main {
    
        public static void main(String[] args) {
    
            Master master = new Master(10);
    
            for(int i = 0 ; i < 10 ; i++){
    
                Task task = new Task(i,"task-" + i);
    
                master.submit(task);
            }
    
            long start = System.currentTimeMillis();
            master.start();
    
            while(true){
    
                if(master.isComlepte()){
    
                    System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start));
                    break;
                }
            }
    
    
        }
    
    }

    运行结果

    wKioL1hDxjzwX2K2AACBfC_nvdY147.png

    总结

    在单线程的时候,处理一个Task需要500ms,那么处理10个Task需要5S,如果采用Master-Worker这种并行模型,仅需要610ms, 可以大大缩短计算处理时间。

    **************************************************************************************
    当你的才华还撑不起你的野心的时候,你就应该静下心来学习;当你的能力还驾驭不了你的目标时,就应该沉下心来,历练;梦想,不是浮躁,而是沉淀和积累,只有拼出来的美丽,没有等出来的辉煌,机会永远是留给最渴望的那个人,学会与内心深处的你对话,问问自己,想 要怎样的人生,静心学习,耐心沉淀,送给自己,共勉。
    **************************************************************************************
  • 相关阅读:
    HDU 1525
    kmp模板
    hdu 4616 Game(树形DP)
    hdu 4619 Warm up 2(并查集活用)
    hdu 4614 Vases and Flowers(线段树加二分查找)
    Codeforces 400D Dima and Bacteria(并查集最短路)
    poj 2823 Sliding Window (单调队列)
    hdu 2196 Computer(树形dp)
    hdu 4604 Deque
    最短路径
  • 原文地址:https://www.cnblogs.com/macoffee/p/13446176.html
Copyright © 2011-2022 走看看