zoukankan      html  css  js  c++  java
  • Future和CompletableFuture

    Future

    从JDK1.5开始,提供了Future来表示异步计算的结果,一般它需要结合ExecutorService(执行者)和Callable(任务)来使用。

    示例

    import java.util.*;
    import java.util.concurrent.*;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            Future<Integer> future = executor.submit(() -> {
                // 故意耗时
                Thread.sleep(5000);
                return new Random().nextInt(100);
            });
    
            System.out.println(future.get());
            System.out.println("如果get是阻塞的,则此消息在数据之后输出");
            executor.shutdown();
        }
    
    }

    输出

    即使异步任务等待了5秒,也依然先于消息输出,由此证明get方法是阻塞的。

    Future只是个接口,实际上返回的类是FutureTask:

    /**
     * 表示此任务的运行状态。最初是NEW == 0。运行状态仅在set、setException和cancel方法中转换为终端状态。
     *
     * 可能的状态转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 如果当前状态是COMPLETING及其之下的状态,则需要进入awaitDone方法阻塞等待任务完成。
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    CompletableFuture

    JDk1.8引入了CompletableFuture,它实际上也是Future的实现类。这里可以得出:

    1. 面试问Future和CompletableFuture的区别实际上是不严谨的,因为一个是接口一个是其实现类。

    2. 问区别实际上是问FutureTask和CompletableFuture的区别,或者说CompletableFuture有哪些新特性,能完成Future不能完成的工作。

    首先看类定义,可以看到,实现了CompletionStage接口,这个接口是所有的新特性了

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

    对于CompletableFuture有四个执行异步任务的方法:

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

    1. 如果我们指定线程池,则会使用我么指定的线程池;如果没有指定线程池,默认使用ForkJoinPool.commonPool()作为线程池。

    2. supply开头的带有返回值,run开头的无返回值。

    1. 执行异步任务(supplyAsync / runAsync)

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return new Random().nextInt(100);
            }, executor);
    
            System.out.println(future.get());
            executor.shutdown();
        }
    
    }

    以上仅仅返回个随机数,如果我们要利用计算结果进一步处理呢?

    2. 结果转换(thenApply / thenApplyAsync)

    // 同步转换
    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    // 异步转换,使用默认线程池
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    // 异步转换,使用指定线程池
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        return new Random().nextInt(100);
                    }, executor)
                    // 对上一步的结果进行处理
                    .thenApply(n -> {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        int res = new Random().nextInt(100);
                        System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                        return n + res;
                    });
    
            System.out.println("我等了你2秒");
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    如果把thenApply换成thenApplyAsync,则会输出:

    处理完任务以及结果,该去消费了

    3. 消费而不影响最终结果(thenAccept / thenRun / thenAcceptBoth)

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
    
    public CompletableFuture<Void> thenRun(Runnable action)
    public CompletableFuture<Void> thenRunAsync(Runnable action)
    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
    
    public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

    这三种的区别是:

    thenAccept:能够拿到并利用执行结果

    thenRun:不能够拿到并利用执行结果,只是单纯的执行其它任务

    thenAcceptBoth:能传入另一个stage,然后把另一个stage的结果和当前stage的结果作为参数去消费。

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        return new Random().nextInt(100);
                    }, executor)
                    // 对上一步的结果进行处理
                    .thenApplyAsync(n -> {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        int res = new Random().nextInt(100);
                        System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                        return n + res;
                    });
            // 单纯的消费执行结果,注意这个方法是不会返回计算结果的——CompletableFuture<Void>
            CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> {
                System.out.println("单纯消费任务执行结果:" + n);
            });
            // 这个无法消费执行结果,没有传入的入口,只是在当前任务执行完毕后执行其它不相干的任务
            future.thenRunAsync(() -> {
                System.out.println("我只能执行其它工作,我得不到任务执行结果");
            }, executor);
    
            // 这个方法会接受其它CompletableFuture返回值和当前返回值
            future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
                return "I'm Other Result";
            }), (current, other) -> {
                System.out.println(String.format("Current:%s,Other:%s", current, other));
            });
    
            System.out.println("我等了你2秒");
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    结果:

    如果我要组合两个任务呢?

    4. 组合任务(thenCombine / thenCompose)

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
    
    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

    这两种区别:主要是返回类型不一样。

    thenCombine:至少两个方法参数,一个为其它stage,一个为用户自定义的处理函数,函数返回值为结果类型。

    thenCompose:至少一个方法参数即处理函数,函数返回值为stage类型。

    先看thenCombine

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> otherFuture = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务A:" + result);
                        return result;
                    }, executor);
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务B:" + result);
                        return result;
                    }, executor)
                    .thenCombineAsync(otherFuture, (current, other) -> {
                        int result = other + current;
                        System.out.println("组合两个任务的结果:" + result);
                        return result;
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    执行结果:

    再来看thenCompose

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务A:" + result);
                        return result;
                    }, executor)
                    .thenComposeAsync((current) -> {
                        return CompletableFuture.supplyAsync(() -> {
                            int b = new Random().nextInt(100);
                            System.out.println("任务B:" + b);
                            int result = b + current;
                            System.out.println("组合两个任务的结果:" + result);
                            return result;
                        }, executor);
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    注意这两个输出虽然一样,但是用法不一样。

    5. 快者优先(applyToEither / acceptEither)

    有个场景,如果我们有多条渠道去完成同一种任务,那么我们肯定选择最快的那个。

    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
    
    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

    这两种区别:仅仅是一个有返回值,一个没有(Void)

    先看applyToEither

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> otherFuture = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者A:" + result);
                        try {
                            // 故意A慢了一些
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return "执行者A【" + result + "】";
                    }, executor);
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者B:" + result);
                        return "执行者B【" + result + "】";
                    }, executor)
                    .applyToEither(otherFuture, (faster) -> {
                        System.out.println("谁最快:" + faster);
                        return faster;
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    再看acceptEither

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> otherFuture = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者A:" + result);
                        try {
                            // 故意A慢了一些
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return "执行者A【" + result + "】";
                    }, executor);
    
            CompletableFuture<Void> future = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者B:" + result);
                        return "执行者B【" + result + "】";
                    }, executor)
                    .acceptEither(otherFuture, (faster) -> {
                        System.out.println("谁最快:" + faster);
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    6. 异常处理(exceptionally / whenComplete / handle)

    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
    
    public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
    
    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

    exceptionally

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .exceptionally(e -> {
                        System.out.println("处理异常:" + e.getMessage());
                        return "处理完毕!";
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    whenComplete

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .whenComplete((result,ex) -> {
                        // 这里等待为了上一步的异常输出完毕
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("上一步结果:" + result);
                        System.out.println("处理异常:" + ex.getMessage());
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出结果:

    可以看见,用whenComplete对异常情况不是特别友好。

    handle

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .handle((result,ex) -> {
                        System.out.println("上一步结果:" + result);
                        System.out.println("处理异常:" + ex.getMessage());
                        return "Value When Exception Occurs";
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    综上,如果单纯要处理异常,那就用exceptionally;如果还想处理结果(没有异常的情况),那就用handle,比whenComplete友好一些,handle不仅能处理异常还能返回一个异常情况的默认值。

    对比

    Future:我们的目的都是获取异步任务的结果,但是对于Future来说,只能通过get方法或者死循环判断isDone来获取。异常情况就更是难办。

    CompletableFuture:只要我们设置好回调函数即可实现:

    1. 只要任务完成,即执行我们设置的函数(不用再去考虑什么时候任务完成)

    2. 如果发生异常,同样会执行我们处理异常的函数,甚至连默认返回值都有(异常情况处理更加省力)

    3. 如果有复杂任务,比如依赖问题,组合问题等,同样可以写好处理函数来处理(能应付复杂任务的处理)

  • 相关阅读:
    spring cloud 学习(Gateway)网关
    spring cloud 学习(Hystrix)熔断器
    spring cloud 学习(Feign)分布式配置中心
    老子道德经-帛书甲本
    重新执行mysql索引
    有关maven2路径
    关于Mysql含有blob字段的查询效率问题
    提交Json参数到Tomcat报400错误的问题
    不知道算不算mysql的漏洞
    JFinal项目中,如何连接两个不同版本的mysql数据库?
  • 原文地址:https://www.cnblogs.com/LUA123/p/12050255.html
Copyright © 2011-2022 走看看