JDK的CompletionService提供了一种将生产新的异步任务与使用已完毕任务的结果分离开来的服务。生产者 submit 运行的任务。使用者 take 已完毕的任务,并依照完毕这些任务的顺序处理它们的结果。比如,CompletionService 能够用来管理异步 IO 。运行读操作的任务作为程序或系统的一部分提交,然后。当完毕读操作时,会在程序的不同部分运行其它操作,运行操作的顺序可能与所请求的顺序不同。
能够简单查看一下CompletionService的唯一实现类ExecutorCompletionService源代码
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际运行任务。内部管理了一个堵塞队列来,在调用submit方法时。会向创建一个新的RunnableFuture,然后异步运行该RunnableFuture。当其状态变为done后,加入CompletionService的堵塞队列中,外部通过调用take()(堵塞)或者poll()(非堵塞,为空返回null)方法获取运行结果。
举个样例:如今要向server发送HTTP请求。服务端对于每一个请求都须要做非常多额外操作,非常消耗时间,则能够将每一个请求接受之后。提交到CompletionService异步处理,等运行完毕之后,在返回给client
package com.yf.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CompletionServiceTest {
private ExecutorService threadPool = Executors.newCachedThreadPool();
private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(
Executors.newCachedThreadPool());
public CompletionServiceTest() {
new Thread() {
public void run() {
while (true) {
try {
Future<Response> f = completionService.take();
/**
* 获取响应信息,返回给client
* 假设completionService任务队列为空,此处将堵塞
*/
Response resp = f.get();
System.out.println(resp.getId());
} catch (Exception e) {
System.out.println("Exception happened:"+e.getMessage());
}
}
};
}.start();
}
class Request{
private int rid;
private String body;
public int getRid() {
return rid;
}
public void setRid(int rid) {
this.rid = rid;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
class Response {
private int id;
private String body;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
class HTTPExecutor {
public Future<Response> execute(final Request request) {
Future<Response> f = threadPool.submit(new Callable<Response>() {
public Response call() throws Exception {
Response response = new Response();
Thread.currentThread().sleep(3000);
response.setId(request.getRid());
response.setBody("response");
return response;
}
});
return f;
}
}
public void submitHTTP(final Request request) {
completionService.submit(new Callable<Response>() {
public Response call() throws Exception {
return new HTTPExecutor().execute(request).get();
}
});
}
public static void main(String[] args) {
CompletionServiceTest t = new CompletionServiceTest();
for (int i = 0; i < 10; i++) {
/**
* 发送10个HTTP请求
*/
Request request =t.new Request();
request.setRid(i);
request.setBody("request");
t.submitHTTP(request);
}
}
}
能够简单查看一下CompletionService的唯一实现类ExecutorCompletionService源代码
关键代码例如以下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际运行任务。内部管理了一个堵塞队列来,在调用submit方法时。会向创建一个新的RunnableFuture,然后异步运行该RunnableFuture。当其状态变为done后,加入CompletionService的堵塞队列中,外部通过调用take()(堵塞)或者poll()(非堵塞,为空返回null)方法获取运行结果。