@Async 异步注解 + 线程池
Spring 的异步注解,可直接标注在方法上,主线程不会阻塞等待结果,而是接着执行下面的方法逻辑
在启动类上需要标注 @EnableAsync
- Async的配置类
@Configuration
public class AsyncConfig {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(6);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(3000);
executor.setThreadNamePrefix("Service-");
executor.initialize();
return executor;
}
}
- 使用 CompletableFuture.completedFuture()方法 把结果封装进 CompletableFuture
- 调用 CompletableFuture 的 thenCombine()方法进行 合并结果计算
CompletionService + 线程池
CompletionService 使用Executor提供的execute()方法 ,执行结果会被放在CompletionService维护在一个阻塞队列中,
执行时间较短的任务优先被存入阻塞队列中。不必阻塞着等某一个future,而使可能已完成的future浪费时间去等待
ExecutorCompletionService 实现了 CompletionService 接口
两种构造
public ExecutorCompletionService(Executor executor) {
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
- 自建阻塞队列
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
调用take()或者poll()方法获取future
两者的区别是take()获取队列中的返回结果时会阻塞,poll()不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除
//获取线程池服务
ThreadPoolExecutor taskExecutor = ThreadPoolService.getConsumeExecutor();
List<Map<String, Object>> resultList = new ArrayList<>();
//创建CompletionService
CompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(taskExecutor);
buildList.forEach(map -> {
completionService.submit(new Callable<Map<String, Object>>() {
@Override
public Map<String, Object> call() throws Exception {
//计算.................
//逻辑.....................
return ;
}
});
});
//从CompletionService 内的阻塞队列中获取future
try {
for (int i = 0; i < buildList.size(); i++) {
Future<Map<String, Object>> mapFuture = completionService.take();
resultList.add(mapFuture.get());
}
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
ExecutorService的invokeAll函数
- 这种方式是 提交任务后,把任务保存在一个存储任务的容器中,最后调用invokeAll一次获取所有任务的future,
- 直接invokeAll提交到线程池中 , 需要注意线程池的拒绝策略
- 默认策略是 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
- ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务
CompletableFuture + 线程池
- supplyAsync 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
- thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
- thenApply转换CompletableFuture中的泛型
/*
* 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
* 如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
*
* runAsync方法不支持返回值。
* supplyAsync可以支持返回值。
*/
CompletableFuture<Double> futuretb = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftb);
CompletableFuture<Double> futuretm = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftm);