zoukankan      html  css  js  c++  java
  • 线程池的多种接收future的姿势记录

    @Async 异步注解 + 线程池

    Spring 的异步注解,可直接标注在方法上,主线程不会阻塞等待结果,而是接着执行下面的方法逻辑
    在启动类上需要标注 @EnableAsync

    • Async的配置类
    @Configuration
    public class AsyncConfig {
        @Bean
        public Executor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(6);
            executor.setMaxPoolSize(6);
            executor.setQueueCapacity(3000);
            executor.setThreadNamePrefix("Service-");
            executor.initialize();
            return executor;
        }
    }
    
    • 使用 CompletableFuture.completedFuture()方法 把结果封装进 CompletableFuture
    • 调用 CompletableFuture 的 thenCombine()方法进行 合并结果计算

    CompletionService + 线程池

    CompletionService 使用Executor提供的execute()方法 ,执行结果会被放在CompletionService维护在一个阻塞队列中,
    执行时间较短的任务优先被存入阻塞队列中。不必阻塞着等某一个future,而使可能已完成的future浪费时间去等待
    ExecutorCompletionService 实现了 CompletionService 接口
    两种构造

    public ExecutorCompletionService(Executor executor) {
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
    
    • 自建阻塞队列
    public ExecutorCompletionService(Executor executor,
                                         BlockingQueue<Future<V>> completionQueue) {
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = completionQueue;
        }
    

    调用take()或者poll()方法获取future
    两者的区别是take()获取队列中的返回结果时会阻塞,poll()不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除

    
            //获取线程池服务
            ThreadPoolExecutor taskExecutor = ThreadPoolService.getConsumeExecutor();
            List<Map<String, Object>> resultList = new ArrayList<>();
            //创建CompletionService
            CompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(taskExecutor);
                
            buildList.forEach(map -> {
                completionService.submit(new Callable<Map<String, Object>>() {
                    @Override
                    public Map<String, Object> call() throws Exception {
                         //计算.................
                         //逻辑.....................   
                        return ;
                    }
                });
            });
    
            //从CompletionService 内的阻塞队列中获取future
            try {
                for (int i = 0; i < buildList.size(); i++) {
                    Future<Map<String, Object>> mapFuture = completionService.take();
                    resultList.add(mapFuture.get());
                }
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
                Thread.currentThread().interrupt();
            }
    

    ExecutorService的invokeAll函数

    • 这种方式是 提交任务后,把任务保存在一个存储任务的容器中,最后调用invokeAll一次获取所有任务的future,
    • 直接invokeAll提交到线程池中 , 需要注意线程池的拒绝策略

    • 默认策略是 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
    • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
    • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

    CompletableFuture + 线程池

    • supplyAsync 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
    • thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
    • thenApply转换CompletableFuture中的泛型
    /*
             * 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
             * 如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
             *
             *  runAsync方法不支持返回值。
             *  supplyAsync可以支持返回值。
             */
    
            CompletableFuture<Double> futuretb = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftb);
            CompletableFuture<Double> futuretm = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftm);
    
  • 相关阅读:
    hdoj 2803 The MAX【简单规律题】
    hdoj 2579 Dating with girls(2)【三重数组标记去重】
    hdoj 1495 非常可乐【bfs隐式图】
    poj 1149 PIGS【最大流经典建图】
    poj 3281 Dining【拆点网络流】
    hdoj 3572 Task Schedule【建立超级源点超级汇点】
    hdoj 1532 Drainage Ditches【最大流模板题】
    poj 1459 Power Network【建立超级源点,超级汇点】
    hdoj 3861 The King’s Problem【强连通缩点建图&&最小路径覆盖】
    hdoj 1012 u Calculate e
  • 原文地址:https://www.cnblogs.com/JMrLi/p/13852664.html
Copyright © 2011-2022 走看看