zoukankan      html  css  js  c++  java
  • 线程池的多种接收future的姿势记录

    @Async 异步注解 + 线程池

    Spring 的异步注解,可直接标注在方法上,主线程不会阻塞等待结果,而是接着执行下面的方法逻辑
    在启动类上需要标注 @EnableAsync

    • Async的配置类
    @Configuration
    public class AsyncConfig {
        @Bean
        public Executor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(6);
            executor.setMaxPoolSize(6);
            executor.setQueueCapacity(3000);
            executor.setThreadNamePrefix("Service-");
            executor.initialize();
            return executor;
        }
    }
    
    • 使用 CompletableFuture.completedFuture()方法 把结果封装进 CompletableFuture
    • 调用 CompletableFuture 的 thenCombine()方法进行 合并结果计算

    CompletionService + 线程池

    CompletionService 使用Executor提供的execute()方法 ,执行结果会被放在CompletionService维护在一个阻塞队列中,
    执行时间较短的任务优先被存入阻塞队列中。不必阻塞着等某一个future,而使可能已完成的future浪费时间去等待
    ExecutorCompletionService 实现了 CompletionService 接口
    两种构造

    public ExecutorCompletionService(Executor executor) {
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
    
    • 自建阻塞队列
    public ExecutorCompletionService(Executor executor,
                                         BlockingQueue<Future<V>> completionQueue) {
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueue = completionQueue;
        }
    

    调用take()或者poll()方法获取future
    两者的区别是take()获取队列中的返回结果时会阻塞,poll()不会被阻塞,若没有结果会返回null,poll和take返回正确的结果后会将该结果从队列中删除

    
            //获取线程池服务
            ThreadPoolExecutor taskExecutor = ThreadPoolService.getConsumeExecutor();
            List<Map<String, Object>> resultList = new ArrayList<>();
            //创建CompletionService
            CompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(taskExecutor);
                
            buildList.forEach(map -> {
                completionService.submit(new Callable<Map<String, Object>>() {
                    @Override
                    public Map<String, Object> call() throws Exception {
                         //计算.................
                         //逻辑.....................   
                        return ;
                    }
                });
            });
    
            //从CompletionService 内的阻塞队列中获取future
            try {
                for (int i = 0; i < buildList.size(); i++) {
                    Future<Map<String, Object>> mapFuture = completionService.take();
                    resultList.add(mapFuture.get());
                }
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
                Thread.currentThread().interrupt();
            }
    

    ExecutorService的invokeAll函数

    • 这种方式是 提交任务后,把任务保存在一个存储任务的容器中,最后调用invokeAll一次获取所有任务的future,
    • 直接invokeAll提交到线程池中 , 需要注意线程池的拒绝策略

    • 默认策略是 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
    • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
    • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

    CompletableFuture + 线程池

    • supplyAsync 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
    • thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
    • thenApply转换CompletableFuture中的泛型
    /*
             * 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
             * 如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
             *
             *  runAsync方法不支持返回值。
             *  supplyAsync可以支持返回值。
             */
    
            CompletableFuture<Double> futuretb = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftb);
            CompletableFuture<Double> futuretm = CompletableFuture.supplyAsync(CompletableFutureExercise::priceOftm);
    
  • 相关阅读:
    C学习笔记-gcc
    C学习笔记-makefile
    C学习笔记-makefile
    C学习笔记-基础数据结构与算法
    上下左右居中 无固定高的div
    Touch事件 移动端touch触摸事件
    让更多浏览器支持html5元素的简单方法
    跨子域的iframe高度自适应
    10进制转16进制,16进制转10进制,随机出一个6位十六进制颜色值
    ajax原理和XmlHttpRequest对象
  • 原文地址:https://www.cnblogs.com/JMrLi/p/13852664.html
Copyright © 2011-2022 走看看