zoukankan      html  css  js  c++  java
  • Java之多线程中的Master-Worker模式

    该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。

    定义Worker进程,负责处理实际任务。
    /*具体工作对象*/
    static abstract class Worker<T, R> implements Runnable {
    private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);
    protected Queue<T> workQueue;//持有Master的任务队列
    protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符

    public void setWorkQueue(Queue<T> workQueue) {
    this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, R> resultMap) {
    this.resultMap = resultMap;
    }

    public abstract R handler(T entity);

    @Override
    public void run() {
    while (true) {
    T childWork = workQueue.poll();
    if (childWork == null) {
    lg.e("已经没有任务在队列中等待执行");
    break;
    }
    //处理子任务
    R result = handler(childWork);
    resultMap.put(Integer.toString(childWork.hashCode()), result);
    }
    }
    }
    义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。
    public static class Master<T, R> {
    private static final UtilsLog lg = UtilsLog.getLogger(Master.class);
    protected Queue<T> workQueue;//用于存储任务集
    protected Map<String, Thread> threadMap;//存储执行任务的线程集
    protected Map<String, R> resultMap;//存储相关结果

    @TargetApi(Build.VERSION_CODES.LOLLIPOP)
    public Master(Worker<T, R> work, int threadCount) {
    workQueue = new ConcurrentLinkedDeque<T>();
    threadMap = new HashMap<>();
    resultMap = new HashMap<>();

    work.setWorkQueue(workQueue);
    work.setResultMap(resultMap);
    for (int i = 0; i < threadCount; i++) {
    threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));
    }
    }

    //是否所有的子任务都结束了
    public boolean isComplete() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
    if (entry.getValue().getState() != Thread.State.TERMINATED) {
    return false;
    }
    }
    return true;
    }

    public Map<String, R> getResultMap() {
    return resultMap;
    }

    public Master addJob(T job) {
    workQueue.add(job);
    return this;
    }

    public void execute() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
    entry.getValue().start();
    }
    }
    }
    业务层调用方案如下,
            Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {
    @Override
    public Integer handler(Integer entity) {
    int max = 50, min = 0;
    UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作
    lg.e("执行handler程序 with value:" + entity);
    return entity * entity;
    }
    }, 3);
    int jobCount = 10;//任务数
    for (int i = 0; i < jobCount; i++) {
    master.addJob(i);
    }
    master.execute();

    Map<String, Integer> resultMap = master.getResultMap();
    while (true) {
    int resultMapSize = resultMap.size();
    // lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变
    if (master.isComplete()) {
    break;
    }
    }
    执行结果如下,

    另外,也应注意到,在执行handler方式,到结果集中写入数据是有延时的,这在开发中需要格外注意,务必使用master.isComplete判断任务完成状况







  • 相关阅读:
    android-基础编程-RecyclerView
    android-基础编程-ListView
    LINUX 日志服务器的搭建
    使用parted进行磁盘分区
    raid磁盘阵列
    LVM逻辑卷管理
    /home 分区迁移试验
    PHP 匹配一个汉字
    xhr dojo load
    ERR: Call to undefined function openssl_random_pseudo_bytes()
  • 原文地址:https://www.cnblogs.com/linux007/p/5790490.html
Copyright © 2011-2022 走看看