zoukankan      html  css  js  c++  java
  • CompletableFuture实现异步获取结果并且等待所有异步任务完成

    直接上代码:

    import com.google.common.collect.Lists;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.function.BiConsumer;
    
    /**
     * CompletableFuture的AllOf功能测试,等待所有任务执行完
     *
     */
    public class CompletableFutureAllOfTest {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4,
                    100L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(10));
    
            method1(executor);
            method2(executor);
            method3(executor);
        }
    
        /**
         * 拆解写法
         * @param executor
         */
        public static void method1 (ExecutorService executor) {
            long start = System.currentTimeMillis();
            // 定义第一个任务
            CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                return "cf1";
            }, executor);
    
            cf1.whenComplete(new BiConsumer<String, Throwable>() {
                @Override
                public void accept(String t, Throwable u) {
                    System.out.println("hello " + t);
                }
            });
    
            // 定义第二个任务
            CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                return "cf2";
            }, executor);
    
            cf2.whenComplete(new BiConsumer<String, Throwable>() {
                @Override
                public void accept(String t, Throwable u) {
                    System.out.println("hello " + t);
                }
            });
            // 开始等待所有任务执行完成
            CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
            System.out.println("start block");
            all.join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        /**
         * 合并写法
         * @param executor
         */
        public static void method2 (ExecutorService executor) {
            List<String> testList = Lists.newArrayList();
            testList.add("cf1");
            testList.add("cf2");
            long start = System.currentTimeMillis();
            CompletableFuture<Void> all = null;
            for (String str : testList) {
                // 定义任务
                CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(5000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                    return str;
                }, executor);
    
                cf.whenComplete(new BiConsumer<String, Throwable>() {
                    @Override
                    public void accept(String t, Throwable u) {
                        System.out.println("hello " + t);
                    }
                });
                all = CompletableFuture.allOf(cf);
            }
            System.out.println("start block");
            // 开始等待所有任务执行完成
            all.join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        /**
         * 通过Java8的stream实现,非常简洁
         * @param executor
         */
        @SuppressWarnings("rawtypes")
        public static void method3 (ExecutorService executor) {
            List<String> testList = Lists.newArrayList();
            testList.add("cf1");
            testList.add("cf2");
            long start = System.currentTimeMillis();
            CompletableFuture[] cfArr = testList.stream().
                    map(t -> CompletableFuture
                            .supplyAsync(() -> pause(t), executor)
                            .whenComplete((result, th) -> {
                                System.out.println("hello" + result);
                            })).toArray(CompletableFuture[]::new);
            // 开始等待所有任务执行完成
            System.out.println("start block");
            CompletableFuture.allOf(cfArr).join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        public static String pause (String name) {
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return name;
        }
    
    }

    参考: CompletableFuture实现异步获取结果并且等待所有异步任务完成

             使用Future实现异步回调的方式

             CompletableFuture 使用详解

            

  • 相关阅读:
    3个常用基于Linux系统命令行WEB网站浏览工具(w3m/Links/Lynx)
    Linux进程关系
    Linux信号基础
    Linux进程基础
    Linux架构
    Linux文本流
    Linux文件管理相关命令
    Linux命令行与命令
    【转载】 input 输入格式化
    【所见即所得】textarea 精确限制字数、行数,中、英、全半角混检 。源码带注释
  • 原文地址:https://www.cnblogs.com/brithToSpring/p/13367770.html
Copyright © 2011-2022 走看看