zoukankan      html  css  js  c++  java
  • Java 线程总结(十四)

    1、在异步任务进程中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java 并发包提供了一个方便的方法,使用 CompletionService,这是一个接口,它的实现类是 ExecutorCompletionService。

    2、与 ExecutorService 一样,CompletionService 也可以提交异步任务,它的不同是,它可以按任务完成顺序获取结果,其具体定义为:

    1
    2
    3
    4
    5
    6
    7
    public interface <> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    }

    submit 方法与 ExecutorService 是一样的,多了 take 和 poll 方法,它们都是获取下一个完成任务的结果,take() 会阻塞等待,poll() 会立即返回,如果没有已完成的任务,返回 null,带时间参数的 poll 方法会最多等待限定的时间。

    2、CompletionService 的主要实现类是 ExecutorCompletionService,它依赖于一个 Executor 完成实际的任务提交,而自己主要负责结果的排队和处理。它的构造方法有两个:

    1
    2
    public ExecutorCompletionService(Executor executor)
    public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

    至少需要一个 Executor 参数,可以提供一个 BlockingQueue 参数,用作完成任务的队列,没有提供的话,ExecutorCompletionService 内部会创建一个 LinkedBlockingQueue。

    3、ExecutorCompletionService 是怎么让结果有序处理的呢?
    因为它有一个额外的队列,每个任务完成之后,都会将代表结果的 Future 入队。在 FutureTask 中,任务完成后,不管是正常完成、异常结束、还是被取消,都会调用 finishCompletion 方法,而该方法会调用一个 done 方法 protected void done() { } 该方法的实现为空,但它是一个 protected 方法,子类可以重写该方法。ExecutorCompletionService 的内部类 QueueingFuture 中重写了该方法。

    在 ExecutorCompletionService 中,提交的任务类型不是一般的 FutureTask,而是一个子类 QueueingFuture

    大专栏  Java 线程总结(十四)>1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);

    executor.execute(new QueueingFuture(f));
    return f;
    }
    ----------------------------

    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
    super(task, null);
    this.task = task;
    }

    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
    }
    ---------------------------

    而 ExecutorCompletionService 的 take/poll 方法就是从该队列获取结果:

    1
    2
    3
    public Future<V> take() throws InterruptedException {
    return completionQueue.take();
    }

    4、AbstractExecutorService 的 invokeAny 的实现,就利用了 ExecutorCompletionService,它的基本思路是,提交任务后,通过 take 方法获取结果,获取到第一个有效结果后,取消所有其他任务。

    5、CompletionService 它通过一个额外的结果队列,方便了对于多个异步任务结果的处理。

    参考博客

    Java编程的逻辑 - 方便的 CompletionService

  • 相关阅读:
    p_value
    p_value
    第一次差异分析
    fdr
    rpkm&map
    rpkm&map
    s
    python数据处理小函数集合
    Jupyter Notebook 的快捷键
    自由度degree of freedom
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12014364.html
Copyright © 2011-2022 走看看