zoukankan      html  css  js  c++  java
  • CompletableFuture基本使用

    引子

    在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。

    Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。

    Future

    看一下Future接口,只有五个方法比较简单

    boolean cancel(boolean mayInterruptIfRunning);//取消任务,如果已经完成或者已经取消,就返回失败

    boolean isCancelled();//查看任务是否取消

    boolean isDone();//查看任务是否完成

    V get() throws InterruptedException, ExecutionException;//刚才用到了,查看结果,任务未完成就一直阻塞

    V get(long timeout, TimeUnit unit)//同上,但是加了一个过期时间,防止长时间阻塞,主线程也做不了事情

    throws InterruptedException, ExecutionException, TimeoutException;

    CompletableFuture

    Java8主要的语言增强的能力有:

    (1)lambda表达式

    (2)stream式操作

    (3)CompletableFuture

    其中第三个特性,就是今天我们想要聊的话题,正是因为CompletableFuture的出现,才使得使用Java进行异步编程提供了可能。

    什么是CompletableFuture?

    CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过 回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

    Future vs CompletableFuture

    Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

    Future的主要缺点如下:

    (1)不支持手动完成

    这个意思指的是,我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。

    (2)不支持进一步的非阻塞调用

    这个指的是我们通过Future的get方法会一直阻塞到任务完成,但是我还想在获取任务之后,执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能。

    (3)不支持链式调用

    这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。

    (4)不支持多个Future合并

    比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。

    (5)不支持异常处理

    Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

    CompletableFuture定义

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

    CompletableFuture实现了两个接口,一个是Future.一个是CompletionStage;future算是一种模式,对结果异步结果的封装,相当于异步结果,而CompletionStage相当于完成阶段,多个CompletionStage可以以流水线的方式组合起来,共同完成任务. 

    CompletableFuture使用例子

    首先说明一下已Async结尾的方法都是可以异步执行的,如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行

    先定义一个获取线程名字的函数用来查询执行当前任务的现场

        public static String getThreadName() {
            return Thread.currentThread().getName() + "=>";
        }

    1,先看一个最简单的例子

        public static void testNew() throws Exception {
            CompletableFuture<String> completableFuture = new CompletableFuture<String>();
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(getThreadName() + "执行.....");
                        completableFuture.complete("success");//在子线程中完成主线程completableFuture的完成
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread t1 = new Thread(runnable);
            t1.start();//启动子线程
    
            String result = completableFuture.get();//主线程阻塞,等待完成
            System.out.println(getThreadName() + result);
        }

    执行结果

    2,运行一个简单的没有返回值的异步任务 

        public static void testNewVoid() throws Exception {
            CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(getThreadName() + "正在执行一个没有返回值的异步任务。");
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            future.get();
            System.out.println(getThreadName() + " 结束。");
        }

     从上面我们可以看到CompletableFuture默认运行使用的是ForkJoin的的线程池。当然,你也可以用lambda表达式使得代码更精简。

    3,运行一个有返回值的异步任务

        public static void testAsync() throws Exception {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        System.out.println(getThreadName() + "正在执行一个有返回值的异步任务。");
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "OK";
                }
            });
            String result = future.get();
            System.out.println(getThreadName() + "结果:" + result);
        }

     当然,上面默认的都是ForkJoinPool我们也可以换成Executor相关的Pool,其api都有支持如下

    高级的使用CompletableFuture

    前面提到的几种使用方法是使用异步编程最简单的步骤,CompletableFuture.get()的方法会阻塞直到任务完成,这其实还是同步的概念,这对于一个异步系统是不够的,因为真正的异步是需要支持回调函数,这样以来,我们就可以直接在某个任务干完之后,接着执行回调里面的函数,从而做到真正的异步概念。

    在CompletableFuture里面,我们通过

    thenApply()

    thenAccept()

    thenRun()

    方法,来运行一个回调函数。

    (1)thenApply()

    这个方法,其实用过函数式编程的人非常容易理解,类似于scala和spark的map算子,通过这个方法可以进行多次链式转化并返回最终的加工结果。

        public static void asyncCallback() throws ExecutionException, InterruptedException {
            CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    System.out.println(getThreadName() + "supplyAsync");
                    return "123";
                }
            });
    
            CompletableFuture<Integer> result1 = task.thenApply(number -> {
                System.out.println(getThreadName() + "thenApply1");
                return Integer.parseInt(number);
            });
    
            CompletableFuture<Integer> result2 = result1.thenApply(number -> {
                System.out.println(getThreadName() + "thenApply2");
                return number * 2;
            });
    
            System.out.println(getThreadName() + result2.get());
        }

     (2)thenAccept()

    这个方法,可以接受Futrue的一个返回值,但是本身不在返回任何值,适合用于多个callback函数的最后一步操作使用。

        public static void asyncCallback2() throws ExecutionException, InterruptedException {
            CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    System.out.println(getThreadName() + "supplyAsync");
                    return "123";
                }
            });
    
            CompletableFuture<Integer> chain1 = task.thenApply(number -> {
                System.out.println(getThreadName() + "thenApply1");
                return Integer.parseInt(number);
            });
    
            CompletableFuture<Integer> chain2 = chain1.thenApply(number -> {
                System.out.println(getThreadName() + "thenApply2");
                return number * 2;
            });
    
            CompletableFuture<Void> result = chain2.thenAccept(product -> {
                System.out.println(getThreadName() + "thenAccept=" + product);
            });
    
            result.get();
            System.out.println(getThreadName() + "end");
        }

    (3) thenRun()

    这个方法与上一个方法类似,一般也用于回调函数最后的执行,但这个方法不接受回调函数的返回值,纯粹就代表执行任务的最后一个步骤:

        public static void asyncCallback3() throws ExecutionException, InterruptedException {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(getThreadName() + "supplyAsync: 一阶段任务");
                return null;
            }).thenRun(() -> {
                System.out.println(getThreadName() + "thenRun: 收尾任务");
            }).get();
        }

     这里注意,截止到目前,前面的例子代码只会涉及两个线程,一个是主线程一个是ForkJoinPool池的线程,但其实上面的每一步都是支持异步运行的,其api如下:

    // thenApply() variants
    <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

    我们看下改造后的一个例子:

        public static void asyncCallback4() throws ExecutionException, InterruptedException {
            CompletableFuture<String> ref1 = CompletableFuture.supplyAsync(() -> {
                try {
                    System.out.println(getThreadName() + "supplyAsync开始执行任务1.... ");
    //                TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(getThreadName() + "supplyAsync:任务1");
                return null;
            });
    
            CompletableFuture<String> ref2 = CompletableFuture.supplyAsync(() -> {
                try {
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(getThreadName() + "thenApplyAsync:任务2");
                return null;
            });
    
            CompletableFuture<String> ref3 = ref2.thenApplyAsync(value -> {
                System.out.println(getThreadName() + "thenApplyAsync:任务2的子任务");
                return null;
            });
    
            Thread.sleep(4000);
            System.out.println(getThreadName() + ref3.get());
        }

    我们可以看到,ForkJoin池的线程1,执行了前面的三个任务,但是第二个任务的子任务,因为我们了使用也异步提交所以它用的线程是ForkJoin池的线程2,最终由于main线程处执行了get是最后结束的。

    还有一点需要注意:

    ForkJoinPool所有的工作线程都是守护模式的,也就是说如果主线程退出,那么整个处理任务都会结束,而不管你当前的任务是否执行完。如果需要主线程等待结束,可采用ExecutorsThreadPool,如下:

    ExecutorService pool = Executors.newFixedThreadPool(5);
    final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                    ... }, pool);

    (4)thenCompose合并两个有依赖关系的CompletableFutures的执行结果

    thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作

    CompletableFutures在执行两个依赖的任务合并时,会返回一个嵌套的结果列表,为了避免这种情况我们可以使用thenCompose来返回,直接获取最顶层的结果数据即可:

        public static void asyncCompose() throws ExecutionException, InterruptedException {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    return "1";
                }
            });
            CompletableFuture<String> nestedResult = future1.thenCompose(value ->
                    CompletableFuture.supplyAsync(() -> {
                        return value + "2";
                    }));
    
            System.out.println(nestedResult.get());
        }

     (5)thenCombine、thenAcceptBoth,合并两个没有依赖关系的CompletableFutures任务

    thenCombine、thenAcceptBoth 都是用来合并任务 —— 等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值。

    thenCombine :

        public static void asyncCombine() throws ExecutionException, InterruptedException {
            CompletableFuture<Double> d1 = CompletableFuture.supplyAsync(new Supplier<Double>() {
                @Override
                public Double get() {
                    return 1d;
                }
            });
            CompletableFuture<Double> d2 = CompletableFuture.supplyAsync(new Supplier<Double>() {
                @Override
                public Double get() {
                    return 2d;
                }
            });
            CompletableFuture<Double> result = d1.thenCombine(d2, (number1, number2) -> {
                return number1 + number2;
            });
    
            System.out.println(result.get());
        }

    thenAcceptBoth :

        public static void thenAcceptBoth() throws Exception {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello";
            });
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "world";
            });
            CompletableFuture<Void> both = future1.thenAcceptBoth(future2, (s1, s2) -> System.out.println(s1 + " " + s2));
            both.get();
        }

     (6)合并多个任务的结果allOf与anyOf

    上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。

    allOf适用于,你有一系列独立的future任务,你想等其所有的任务执行完后做一些事情。举个例子,比如我想下载100个网页,传统的串行,性能肯定不行,这里我们采用异步模式,同时对100个网页进行下载,当所有的任务下载完成之后,我们想判断每个网页是否包含某个关键词。

    下面我们通过随机数来模拟上面的这个场景如下:

        public static void mutilTaskTest() throws ExecutionException, InterruptedException {
            //添加n个任务
            CompletableFuture<Double> array[] = new CompletableFuture[5];
            for (int i = 0; i < 5; i++) {
                array[i] = CompletableFuture.supplyAsync(new Supplier<Double>() {
                    @Override
                    public Double get() {
                        return Math.random();
                    }
                });
            }
    //        //获取结果的方式一
    //        CompletableFuture.allOf(array).get();
    //        for (CompletableFuture<Double> cf : array) {
    //            if (cf.get() > 0.6) {
    //                System.out.println(cf.get());
    //            }
    //        }
            //获取结果的方式二,过滤大于指定数字,在收集输出
            List<Double> rs = Stream.of(array).map(CompletableFuture::join).filter(number -> number > 0.6).collect(Collectors.toList());
            System.out.println(rs);
        }

    注意其中的join方法和get方法类似,仅仅在于在Future不能正常完成的时候抛出一个unchecked的exception,这可以确保它用在Stream的map方法中,直接使用get是没法在map里面运行的。

    anyOf方法,也比较简单,意思就是只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束。

        public static void anyOf() throws ExecutionException, InterruptedException {
            CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        TimeUnit.SECONDS.sleep(4);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "wait 4 seconds";
                }
            });
    
            CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "wait 2 seconds";
                }
            });
    
            CompletableFuture<String> f3 = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        TimeUnit.SECONDS.sleep(4);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "wait 10 seconds";
                }
            });
    
            CompletableFuture<Object> result = CompletableFuture.anyOf(f1, f2, f3);
            System.out.println(result.get());
        }

     注意由于Anyof返回的是其中任意一个Future所以这里没有明确的返回类型,统一使用Object接受,留给使用端处理。

    (7)exceptionally异常处理

    异常处理是异步计算的一个重要环节,下面看看如何在CompletableFuture中使用:

        public static void testEX() throws ExecutionException, InterruptedException {
            int age = -1;
            CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    if (age < 0) {
                        throw new IllegalArgumentException("性别必须大于0");
                    }
                    if (age < 18) {
                        return "未成年人";
                    }
                    return "成年人";
                }
            }).exceptionally(ex -> {
                System.out.println(ex.getMessage());
                return "发生 异常" + ex.getMessage();
            });
    
            System.out.println(task.get());
        }

     此外还有另外一种异常捕捉方法handle,无论发生异常都会执行,示例如下:

        public static void testEX() throws ExecutionException, InterruptedException {
            int age = 10;
            CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    if (age < 0) {
                        throw new IllegalArgumentException("性别必须大于0");
                    }
                    if (age < 18) {
                        return "未成年人";
                    }
                    return "成年人";
                }
            }).handle((res, ex) -> {
                System.out.println("执行handle");
                if (ex != null) {
                    System.out.println("发生异常");
                    return "发生 异常" + ex.getMessage();
                }
                return res;
            });
    
            System.out.println(task.get());
        }

     注意上面的方法不管正常或者异常会执行handle方法。

    总结

    1. runAsync、supplyAsync

    // 无返回值
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    // 有返回值
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    2. whenComplete、whenCompleteAsync

    // 执行完成时,当前任务的线程执行继续执行 whenComplete 的任务。
    public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
    // 执行完成时,把 whenCompleteAsync 这个任务提交给线程池来进行执行。
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

    3. thenApply、handle

    //当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化<br>public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)<br>//与thenApply的区别是可能是新的线程
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    //与thenApply效果差不多,出现异常不会走thenApply,handle就可以
    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

    4. thenAccept、thenRun

    //thenAccept 接收任务的处理结果,并消费处理。无返回结果。
    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
    //thenRun 跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun。
    public CompletionStage<Void> thenRun(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

    5. thenCombine、thenAcceptBoth

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
     
    public <U,V> CompletionStage<V> thenAcceptBoth(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenAcceptBothAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenAcceptBothAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

    thenCombine、thenAcceptBoth 都是用来合并任务 —— 等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值。

    6. applyToEither、acceptEither、runAfterEither、runAfterBoth

    • applyToEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,有返回值。
    • acceptEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,无返回值。
    • runAfterEither:两个 CompletionStage,任何一个完成了,都会执行下一步的操作(Runnable),无返回值。
    • runAfterBoth:两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable),无返回值。

    7. thenCompose

    thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作

    JDK9 CompletableFuture 类增强的主要内容

    (1)支持对异步方法的超时调用

    orTimeout()
    completeOnTimeout()

    (2)支持延迟调用

    Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
    Executor delayedExecutor(long delay, TimeUnit unit)

    转载

    https://www.cnblogs.com/liangsonghua/p/www_liangsonghua_me_37.html

    https://cloud.tencent.com/developer/article/1366581

  • 相关阅读:
    HDU-2502-月之数
    C语言的位运算的优势
    HDU-1026-Ignatius and the Princess I
    HDU-1015-Safecracker
    HDU-1398-Square Coins
    HDU-1028-Ignatius and the Princess III
    背包的硬币问题
    HDU-1527-取石子游戏
    HDU-1996-汉诺塔VI
    css中的选择器
  • 原文地址:https://www.cnblogs.com/grasp/p/14325606.html
Copyright © 2011-2022 走看看