zoukankan      html  css  js  c++  java
  • Java Master-Worker 模式实现

    Master-Worker模式简介

    Master-Worker模式是非常经典的常用的一个并行计算模式,它的核心思想是2类进程协作工作:Master进程和Worker进程。Master负责接收客户端请求,分配任务;Worker负责具体处理任务。当各个Worker处理完任务后,统一将结果返回给Master,由Master进行整理和总结。其好处是能够将一个大JOB分解成若干小JOB,并行执行,从而提高系统的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在这种Master-Worker工作模式;离线分布式计算框架Hadoop的JobTracker和TaskTracker,实时流计算框架Strom的Nimbus和Supervisor都涉及到这种思想。那么下面我们来具体分析下Java Master-Worker模式的实现。

    Master-Worker模式分析

    我们重点分析下Master,Worker这2个角色。

    Master

    Master需要接受Client端提交过来的任务Task,而且还得将Task分配给Worker进行处理,因此Master需要一个存储来存放Task。那么采用哪种存储集合呢?首先来说,需要支持并发的集合类,因为多个Worker间可能存在任务竞争,因此我们需要考虑java.util.concurrent包下的集合。这里可以考虑采用非阻塞的ConcurrentLinkedQueue。

    Master需要清楚的知道各个Woker的基本信息,如是否各个Worker都运行完毕,因此Master端需要保存Worker的信息,可以采用Map存储。

    由于最后各个Worker都会上报运行结果,Master端需要有一个存储结果的Map,可以采用支持并发的ConcurrentHashMap。

     

    Worker

    Worker需要持有Master端的任务Task集合的引用,因为Worker需要从里面拿取Task。

    同上,Worker需要持有Master端的存储结果的引用。

    我们可以进一步细化,Master/Worker应该提供什么操作?

    Master:

    1. 通过构造方法以初始化workers

    2. 应该提供submit(Task)方法接受Client端提交过来的任务

    3. start()让workers开始处理任务

    4. 提供isComplete()判断各个worker的状态,是否都处理完毕

    5. 提供getResult()给客户端返回结果

    Worker:

    1. Worker本质上就是Runnable,提供run()

    2. 负责处理业务逻辑的handle()

    Java Master-Worker代码实现

    Task

    public class Task {
    
        private long id;
        private String name;
    
        public Task(long id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public long getId() {
            return id;
        }
    
        public void setId(long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
    
    }

    Worker

    public class Worker implements Runnable {
    
        private long id;
        private String name;
    
        private ConcurrentLinkedQueue<Task> workQueue;
    
        private ConcurrentHashMap<Long,Object> results;
    
        public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }
    
        public void setResults(ConcurrentHashMap<Long, Object> results) {
            this.results = results;
        }
    
        public Worker(long id, String name) {
            this.id = id;
            this.name = name;
        }
    
        @Override
        public void run() {
    
            while(true){
    
                Task task = workQueue.poll();
    
                if(task == null){
                    break;
                }
    
                long start = System.currentTimeMillis();
                long result = handle(task);
    
                this.results.put(task.getId(),result);
    
                System.out.println(this.name + " handle " + task.getName() + " success . result is " 
            + result + " cost time : " + (System.currentTimeMillis() - start)); } } /** * 负责处理具体业务逻辑 * @param task * @return */ private long handle(Task task) { //这里只是模拟下,在真实环境也许是查询数据库,也许是查缓存等 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return new Random().nextLong(); } }

    Master

    public class Master {
    
        private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
    
        private Map<Long,Thread> workers = new HashMap<Long, Thread>();
    
        private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>();
    
        public Master(int num){
    
            for(int i = 0 ; i < num ; i++){
    
                Worker worker = new Worker(i,"worker-" + i);
                worker.setResults(results);
                worker.setWorkQueue(workQueue);
    
                workers.put(Long.valueOf(i),new Thread(worker));
            }
    
        }
    
        public void submit(Task task){
            workQueue.add(task);
        }
    
        public void start(){
    
            for (Map.Entry<Long,Thread> entry : workers.entrySet()){
    
                entry.getValue().start();
            }
    
        }
    
        public boolean isComlepte(){
    
            for(Map.Entry<Long,Thread> entry : workers.entrySet()){
    
                if(entry.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
    
            }
    
            return true;
        }
    
        public long getSumResult(){
    
            long value = 0;
            for(Map.Entry<Long,Object> entry : results.entrySet()){
    
                value = value + (Long)entry.getValue();
    
            }
            return value;
        }
    }

    Main

    public class Main {
    
        public static void main(String[] args) {
    
            Master master = new Master(10);
    
            for(int i = 0 ; i < 10 ; i++){
    
                Task task = new Task(i,"task-" + i);
    
                master.submit(task);
            }
    
            long start = System.currentTimeMillis();
            master.start();
    
            while(true){
    
                if(master.isComlepte()){
    
                    System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start));
                    break;
                }
            }
    
    
        }
    
    }

    运行结果

    wKioL1hDxjzwX2K2AACBfC_nvdY147.png

    总结

    在单线程的时候,处理一个Task需要500ms,那么处理10个Task需要5S,如果采用Master-Worker这种并行模型,仅需要610ms, 可以大大缩短计算处理时间。

    **************************************************************************************
    当你的才华还撑不起你的野心的时候,你就应该静下心来学习;当你的能力还驾驭不了你的目标时,就应该沉下心来,历练;梦想,不是浮躁,而是沉淀和积累,只有拼出来的美丽,没有等出来的辉煌,机会永远是留给最渴望的那个人,学会与内心深处的你对话,问问自己,想 要怎样的人生,静心学习,耐心沉淀,送给自己,共勉。
    **************************************************************************************
  • 相关阅读:
    App上线-Unexpected CFBundleExecutable Key
    Java面向对象-001-继承与构造函数
    Java-012-Scanner类和程序异常处理(ExceptionHandle)
    iOS CoreLocation 获取用户当前位置
    Java-011-Java流(Stream)、文件(File)和IO
    Java-010-正则表达式和方法(RegualrExpressionAndMethod)
    Java-009-数组和日期时间类(Date,Calendar)详解
    Java-008-String类、StringBuffer和StringBuilder类
    [vue]vue基础复习项案例stepbystep
    [vue]mvc模式和mvvm模式及vue学习思路(废弃)
  • 原文地址:https://www.cnblogs.com/macoffee/p/13446176.html
Copyright © 2011-2022 走看看