zoukankan      html  css  js  c++  java
  • CompletionService简讲

    背景

    最近在项目中看到太多后台task中使用Executor框架,提交任务后,把future都一个个加入到list,再一个个get这些future的代码。
    这个的问题在于一方面没有时限,可能会被某些运行缓慢的future拖很久。即便使用带超时控制的get方法,这样加入list再get的做法依然很繁琐。

    其实在《Java并发编程实战》或者《Java多线程编程的艺术》这些书中都介绍过JDK提供了CompletionService接口。

    例程

    JDK为我们提供的CompletionService接口的默认实现是java.util.concurrent.ExecutorCompletionService。在它的Java Doc中已经给出两个demo例程。

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
         CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
         for (Callable<Result> s : solvers)
             ecs.submit(s);
         int n = solvers.size();
         for (int i = 0; i < n; ++i) {
             Result r = ecs.take().get();
             if (r != null)
                 use(r);
         }
     }
    

    上面的例程展示了CompletionService的基本使用。它的实现融合了Executor和BlockingQueue。可以看到任务的执行依托于内部的Executor,而一个任务完成后会被加到阻塞队列中,调用线程可以及时获取到新完成的任务。

    如下所示为Java Doc中另一个例程。

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
         CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
         int n = solvers.size();
         List<Future<Result>> futures
             = new ArrayList<Future<Result>>(n);
         Result result = null;
         try {
             for (Callable<Result> s : solvers)
                 futures.add(ecs.submit(s));
             for (int i = 0; i < n; ++i) {
                 try {
                     Result r = ecs.take().get();
                     if (r != null) {
                         result = r;
                         break;
                     }
                 } catch (ExecutionException ignore) {}
             }
         }
         finally {
             for (Future<Result> f : futures)
                 f.cancel(true);
         }
    
         if (result != null)
             use(result);
     }
    

    上面的例程中,用于获取第一个返回值不为null的任务结果,并取消其他任务。

    原理

    ExecutorCompletionService的源码实现非常简单。
    内部就三个东西:

    // 构造方法传入的executor实例。
    private final Executor executor;
    // 如果构造方法传入的executor实例是AbstractExecutorService子类,则类型转化后保存。
    private final AbstractExecutorService aes;
    // 用于保存完成的future,所谓完成可以是有异常或者已经取消。
    private final BlockingQueue<Future<V>> completionQueue;
    

    内部最核心的嵌套类是:

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    

    QueueingFuture内部组合了一个RunnableFuture,然后在构造方法中通过父类FutureTask的构造方法,将之转化为FutureTask的内部callable。
    它对FutureTask中的钩子方法done进行了覆盖,将构造函数传入的RunnableFuture在完成后加到阻塞队列中。

    FutureTask的done方法会在任务正常完成/发生异常/被取消后被调用。更多源码可以参考我的FutureTask源码解读

    剩余提交任务的各种submit方法,无非就是在原来的FutureTask上用QueueingFuture套上一套,实现任务在完成后加到阻塞队列的逻辑。
    而获取任务的take/poll方法的实现就是调用内部阻塞队列而已。

    至此,全部讲完了。

  • 相关阅读:
    QTP最小化代码
    开源Web自动化测试框架
    翟志刚系电脑游戏高手
    Java开源框架集[转载]
    Windows xp 控制台命令一览表〔转载〕
    三大措施将SQL注入攻击的危害最小化
    Zee书评:对于涌的《软件性能测试与Load Runner实战》的个人看法
    藏獒遭主人打骂后咬舌自尽
    IDS\IPS相关知识〔搜集〕
    lr之RTE脚本(telnet方式访问水木清华)
  • 原文地址:https://www.cnblogs.com/micrari/p/7495925.html
Copyright © 2011-2022 走看看