ExecutorCompletionService有三个成员变量:
executor:执行task的线程池,创建CompletionService必须指定;
aes:主要用于创建待执行task;
completionQueue:存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。
-
take在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,等待,直到存在这样的task;
-
poll在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,不等待,直接返回null。
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion. */ private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; protected void done() { completionQueue.add(task); } } private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * * @param executor the executor to use * @throws NullPointerException if executor is {@code null} */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * * @param executor the executor to use * @param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service. This * queue is treated as unbounded -- failed attempted * {@code Queue.add} operations for completed tasks cause * them not to be retrievable. * @throws NullPointerException if executor or completionQueue are {@code null} */ public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }
实例:
package com._ThreadPool._2; import java.math.BigInteger; import java.util.concurrent.*; public class MyCompletionService { public static void main(String[] args) { //创建一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //创建一个CompletionService CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService); for(int i = 1 ; i < 9 ; i ++){ long start = i * 1000000; long end = start + 1000000; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { System.out.println("开始计算" + start + "到" + end); long sum = 0; for(long j = start ; j <=end ; j++){ sum += j; } System.out.println(start + "到" + end + "结果是" + sum); return sum; } }); } long result = 0; for (int i = 1 ; i < 9 ;i ++){ try { result += completionService.take().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println(result); } }
结果:
开始计算3000000到4000000 开始计算2000000到3000000 开始计算1000000到2000000 开始计算5000000到6000000 开始计算4000000到5000000 3000000到4000000结果是3500003500000 4000000到5000000结果是4500004500000 2000000到3000000结果是2500002500000 开始计算6000000到7000000 5000000到6000000结果是5500005500000 开始计算7000000到8000000 1000000到2000000结果是1500001500000 开始计算8000000到9000000 7000000到8000000结果是7500007500000 8000000到9000000结果是8500008500000 6000000到7000000结果是6500006500000 40000040000000