zoukankan      html  css  js  c++  java
  • Java批处理ExecutorService/CompletionService

    服务端接收一个请求,常常需要同时进行几个计算或者向其他服务发送请求,最后拼装结果返回上游。本文就来看下JDK提供几个并行处理方案,牵涉到ExcecutorService/CompletionService。要实现的场景是请求有超时限制,如果所有操作都计算完成,则全部拼装返回;否则只拼装部分完成的结果。

    1.前提

    //任务类,sleep一个时间代表这个计算需要的耗时,返回一个计算结果。
    public class MyTask implements Callable<Integer> {
        private int id;
        private int time;
        public MyTask(int i, int time) {
            this.id = i;
            this.time = time;
        }
        @Override
        public Integer call() throws Exception {
            Thread.sleep(time);
            return id;
        }
    }
    //线程池
    ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 
    60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));

    2.任务逐个等待

    提交任务后,逐个等待结果返回。

    //总共150ms超时
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
    futures.add(threadPool.submit(new MyTask(1, 60)));
    futures.add(threadPool.submit(new MyTask(2, 150)));
    Integer res = 0;
    
    for (int i = 0; i < futures.size(); ++i) {
    try {
    Integer tmp = futures.get(i).get(50 + i * 50, TimeUnit.MILLESECONDS);
    res += tmp;
    } catch (Exception e) {
    //nothing
    }
    }
    System.out.println(res);

    打印结果为0,实际上第60ms时第一个计算已经完成,本可以返回第一个结果的。这是因为生产者和消费者一一对应,调度不当。

    3.等待解耦

    用一个队列来实现生产者和消费者解耦,生产者把结果放到一个队列中,消费者从队列中取结果,不按照任务提交顺序等待,只要有结果就会消耗。CompletionService就是对Executor和异步队列的封装。

    CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
    futures.add(service.submit(new MyTask(1, 60)));
    futures.add(service.submit(new MyTask(2, 150)));
    
    List<Integer> result = new ArrayList<Integer>(futures.size());
    for (int i = 0; i < futures.size(); ++i) {
        Future<Integer> future = service.poll(50 + i * 50, TimeUnit.MILLISECONDS);
        if (null != future){
            result.add(future.get());
        }
    }
    int res = 0;
    for (Integer i : result) {
        res += null == i ? 0 : i;
    }
    System.out.println(res);

    打印结果为1,即第一个计算的结果。谁先完成就取谁。

    4.充分利用时间

    前面两个方案为每个任务都设置了一个固定的等待时间,两者之和不超过超时限制。这没有充分利用时间,两个任务是并发互不影响的,其各自可利用的时间应该是超时时间,而不应该是两者之和为超时时间。如果第一个任务耗时140,第二个耗时60,前两个任务就只能获得第二个任务的结果。如果两者都设置超时为150,就能获得2个结果。

    List<MyTask> tasks = new ArrayList<MyTask>();
    tasks.add(new MyTask(1, 140));
    tasks.add(new MyTask(2, 60));
    List<Future<Integer>> futures = threadPool.invokeAll(tasks, 150, TimeUnit.MILLISECONDS);
    
    int res = 0;
    for (Future<Integer> future : futures) {
        System.out.println("isDone:" + future.isDone());
        System.out.println("isCancel:" + future.isCancelled());
        if (future.isCancelled()) {
            continue;
        }
        res += future.get();
    }
    System.out.println(res);
    

    打印结果是3。上述方法利用ExecutorService的invokeAll方法,该方法在所有任务都完成或等待超时时返回,超时的时候会取消还没有完成的任务,通过判断任务是否被取消来判断任务是否计算完成。

    该文章说的是结果互不影响、有结果就能返回的应用场景。如果结果需要按照先后顺序进行合并,或者只需要等待一个计算完成,就不一定需要方案3了。

  • 相关阅读:
    27、驱动调试之修改系统时钟中断定位系统僵死问题
    25、驱动调试之打印到proc虚拟文件
    24、驱动调试之printk
    23、uevent/hotplug热拔插机制
    22、DMA驱动程序框架
    21、IIS声卡驱动程序
    20、RTC驱动程序
    ZOJ
    HDU-4272 LianLianKan (dfs)
    UVA-624 CD (01背包+路径记忆)
  • 原文地址:https://www.cnblogs.com/whuqin/p/5035369.html
Copyright © 2011-2022 走看看