zoukankan      html  css  js  c++  java
  • Java之多线程中的Master-Worker模式

    该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。

    定义Worker进程,负责处理实际任务。
    /*具体工作对象*/
    static abstract class Worker<T, R> implements Runnable {
    private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);
    protected Queue<T> workQueue;//持有Master的任务队列
    protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符

    public void setWorkQueue(Queue<T> workQueue) {
    this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, R> resultMap) {
    this.resultMap = resultMap;
    }

    public abstract R handler(T entity);

    @Override
    public void run() {
    while (true) {
    T childWork = workQueue.poll();
    if (childWork == null) {
    lg.e("已经没有任务在队列中等待执行");
    break;
    }
    //处理子任务
    R result = handler(childWork);
    resultMap.put(Integer.toString(childWork.hashCode()), result);
    }
    }
    }
    义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。
    public static class Master<T, R> {
    private static final UtilsLog lg = UtilsLog.getLogger(Master.class);
    protected Queue<T> workQueue;//用于存储任务集
    protected Map<String, Thread> threadMap;//存储执行任务的线程集
    protected Map<String, R> resultMap;//存储相关结果

    @TargetApi(Build.VERSION_CODES.LOLLIPOP)
    public Master(Worker<T, R> work, int threadCount) {
    workQueue = new ConcurrentLinkedDeque<T>();
    threadMap = new HashMap<>();
    resultMap = new HashMap<>();

    work.setWorkQueue(workQueue);
    work.setResultMap(resultMap);
    for (int i = 0; i < threadCount; i++) {
    threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));
    }
    }

    //是否所有的子任务都结束了
    public boolean isComplete() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
    if (entry.getValue().getState() != Thread.State.TERMINATED) {
    return false;
    }
    }
    return true;
    }

    public Map<String, R> getResultMap() {
    return resultMap;
    }

    public Master addJob(T job) {
    workQueue.add(job);
    return this;
    }

    public void execute() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
    entry.getValue().start();
    }
    }
    }
    业务层调用方案如下,
            Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {
    @Override
    public Integer handler(Integer entity) {
    int max = 50, min = 0;
    UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作
    lg.e("执行handler程序 with value:" + entity);
    return entity * entity;
    }
    }, 3);
    int jobCount = 10;//任务数
    for (int i = 0; i < jobCount; i++) {
    master.addJob(i);
    }
    master.execute();

    Map<String, Integer> resultMap = master.getResultMap();
    while (true) {
    int resultMapSize = resultMap.size();
    // lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变
    if (master.isComplete()) {
    break;
    }
    }
    执行结果如下,

    另外,也应注意到,在执行handler方式,到结果集中写入数据是有延时的,这在开发中需要格外注意,务必使用master.isComplete判断任务完成状况







  • 相关阅读:
    uva 10369 Arctic Network
    uvalive 5834 Genghis Khan The Conqueror
    uvalive 4848 Tour Belt
    uvalive 4960 Sensor Network
    codeforces 798c Mike And Gcd Problem
    codeforces 796c Bank Hacking
    codeforces 768c Jon Snow And His Favourite Number
    hdu 1114 Piggy-Bank
    poj 1276 Cash Machine
    bzoj 2423 最长公共子序列
  • 原文地址:https://www.cnblogs.com/linux007/p/5790490.html
Copyright © 2011-2022 走看看