zoukankan      html  css  js  c++  java
  • Future和CompletableFuture

    Future

    从JDK1.5开始,提供了Future来表示异步计算的结果,一般它需要结合ExecutorService(执行者)和Callable(任务)来使用。

    示例

    import java.util.*;
    import java.util.concurrent.*;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            Future<Integer> future = executor.submit(() -> {
                // 故意耗时
                Thread.sleep(5000);
                return new Random().nextInt(100);
            });
    
            System.out.println(future.get());
            System.out.println("如果get是阻塞的,则此消息在数据之后输出");
            executor.shutdown();
        }
    
    }

    输出

    即使异步任务等待了5秒,也依然先于消息输出,由此证明get方法是阻塞的。

    Future只是个接口,实际上返回的类是FutureTask:

    /**
     * 表示此任务的运行状态。最初是NEW == 0。运行状态仅在set、setException和cancel方法中转换为终端状态。
     *
     * 可能的状态转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 如果当前状态是COMPLETING及其之下的状态,则需要进入awaitDone方法阻塞等待任务完成。
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    CompletableFuture

    JDk1.8引入了CompletableFuture,它实际上也是Future的实现类。这里可以得出:

    1. 面试问Future和CompletableFuture的区别实际上是不严谨的,因为一个是接口一个是其实现类。

    2. 问区别实际上是问FutureTask和CompletableFuture的区别,或者说CompletableFuture有哪些新特性,能完成Future不能完成的工作。

    首先看类定义,可以看到,实现了CompletionStage接口,这个接口是所有的新特性了

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

    对于CompletableFuture有四个执行异步任务的方法:

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

    1. 如果我们指定线程池,则会使用我么指定的线程池;如果没有指定线程池,默认使用ForkJoinPool.commonPool()作为线程池。

    2. supply开头的带有返回值,run开头的无返回值。

    1. 执行异步任务(supplyAsync / runAsync)

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return new Random().nextInt(100);
            }, executor);
    
            System.out.println(future.get());
            executor.shutdown();
        }
    
    }

    以上仅仅返回个随机数,如果我们要利用计算结果进一步处理呢?

    2. 结果转换(thenApply / thenApplyAsync)

    // 同步转换
    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    // 异步转换,使用默认线程池
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    // 异步转换,使用指定线程池
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        return new Random().nextInt(100);
                    }, executor)
                    // 对上一步的结果进行处理
                    .thenApply(n -> {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        int res = new Random().nextInt(100);
                        System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                        return n + res;
                    });
    
            System.out.println("我等了你2秒");
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    如果把thenApply换成thenApplyAsync,则会输出:

    处理完任务以及结果,该去消费了

    3. 消费而不影响最终结果(thenAccept / thenRun / thenAcceptBoth)

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
    
    public CompletableFuture<Void> thenRun(Runnable action)
    public CompletableFuture<Void> thenRunAsync(Runnable action)
    public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
    
    public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

    这三种的区别是:

    thenAccept:能够拿到并利用执行结果

    thenRun:不能够拿到并利用执行结果,只是单纯的执行其它任务

    thenAcceptBoth:能传入另一个stage,然后把另一个stage的结果和当前stage的结果作为参数去消费。

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        return new Random().nextInt(100);
                    }, executor)
                    // 对上一步的结果进行处理
                    .thenApplyAsync(n -> {
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        int res = new Random().nextInt(100);
                        System.out.println(String.format("如果是同步的,这条消息应该先输出。上一步结果:%s,新加:%s", n, res));
                        return n + res;
                    });
            // 单纯的消费执行结果,注意这个方法是不会返回计算结果的——CompletableFuture<Void>
            CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> {
                System.out.println("单纯消费任务执行结果:" + n);
            });
            // 这个无法消费执行结果,没有传入的入口,只是在当前任务执行完毕后执行其它不相干的任务
            future.thenRunAsync(() -> {
                System.out.println("我只能执行其它工作,我得不到任务执行结果");
            }, executor);
    
            // 这个方法会接受其它CompletableFuture返回值和当前返回值
            future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
                return "I'm Other Result";
            }), (current, other) -> {
                System.out.println(String.format("Current:%s,Other:%s", current, other));
            });
    
            System.out.println("我等了你2秒");
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    结果:

    如果我要组合两个任务呢?

    4. 组合任务(thenCombine / thenCompose)

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
    
    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

    这两种区别:主要是返回类型不一样。

    thenCombine:至少两个方法参数,一个为其它stage,一个为用户自定义的处理函数,函数返回值为结果类型。

    thenCompose:至少一个方法参数即处理函数,函数返回值为stage类型。

    先看thenCombine

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> otherFuture = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务A:" + result);
                        return result;
                    }, executor);
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务B:" + result);
                        return result;
                    }, executor)
                    .thenCombineAsync(otherFuture, (current, other) -> {
                        int result = other + current;
                        System.out.println("组合两个任务的结果:" + result);
                        return result;
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    执行结果:

    再来看thenCompose

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<Integer> future = CompletableFuture
                    // 执行异步任务
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("任务A:" + result);
                        return result;
                    }, executor)
                    .thenComposeAsync((current) -> {
                        return CompletableFuture.supplyAsync(() -> {
                            int b = new Random().nextInt(100);
                            System.out.println("任务B:" + b);
                            int result = b + current;
                            System.out.println("组合两个任务的结果:" + result);
                            return result;
                        }, executor);
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    注意这两个输出虽然一样,但是用法不一样。

    5. 快者优先(applyToEither / acceptEither)

    有个场景,如果我们有多条渠道去完成同一种任务,那么我们肯定选择最快的那个。

    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
    
    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

    这两种区别:仅仅是一个有返回值,一个没有(Void)

    先看applyToEither

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> otherFuture = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者A:" + result);
                        try {
                            // 故意A慢了一些
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return "执行者A【" + result + "】";
                    }, executor);
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者B:" + result);
                        return "执行者B【" + result + "】";
                    }, executor)
                    .applyToEither(otherFuture, (faster) -> {
                        System.out.println("谁最快:" + faster);
                        return faster;
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    再看acceptEither

    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> otherFuture = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者A:" + result);
                        try {
                            // 故意A慢了一些
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return "执行者A【" + result + "】";
                    }, executor);
    
            CompletableFuture<Void> future = CompletableFuture
                    .supplyAsync(() -> {
                        int result = new Random().nextInt(100);
                        System.out.println("执行者B:" + result);
                        return "执行者B【" + result + "】";
                    }, executor)
                    .acceptEither(otherFuture, (faster) -> {
                        System.out.println("谁最快:" + faster);
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    6. 异常处理(exceptionally / whenComplete / handle)

    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
    
    public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
    
    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

    exceptionally

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .exceptionally(e -> {
                        System.out.println("处理异常:" + e.getMessage());
                        return "处理完毕!";
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    whenComplete

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .whenComplete((result,ex) -> {
                        // 这里等待为了上一步的异常输出完毕
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("上一步结果:" + result);
                        System.out.println("处理异常:" + ex.getMessage());
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出结果:

    可以看见,用whenComplete对异常情况不是特别友好。

    handle

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            CompletableFuture<String> future = CompletableFuture
                    .supplyAsync(() -> {
                        if (true){
                            throw new RuntimeException("Error!!!");
                        }
                        return "Hello";
                    }, executor)
                    // 处理上一步发生的异常
                    .handle((result,ex) -> {
                        System.out.println("上一步结果:" + result);
                        System.out.println("处理异常:" + ex.getMessage());
                        return "Value When Exception Occurs";
                    });
    
            System.out.println(future.get());
    
            executor.shutdown();
        }
    
    }

    输出:

    综上,如果单纯要处理异常,那就用exceptionally;如果还想处理结果(没有异常的情况),那就用handle,比whenComplete友好一些,handle不仅能处理异常还能返回一个异常情况的默认值。

    对比

    Future:我们的目的都是获取异步任务的结果,但是对于Future来说,只能通过get方法或者死循环判断isDone来获取。异常情况就更是难办。

    CompletableFuture:只要我们设置好回调函数即可实现:

    1. 只要任务完成,即执行我们设置的函数(不用再去考虑什么时候任务完成)

    2. 如果发生异常,同样会执行我们处理异常的函数,甚至连默认返回值都有(异常情况处理更加省力)

    3. 如果有复杂任务,比如依赖问题,组合问题等,同样可以写好处理函数来处理(能应付复杂任务的处理)

  • 相关阅读:
    Atitit fms Strait (海峡) lst 数据列表目录1. 4大洋 12. 著名的海大约40个,总共约55个海 13. 海区列表 23.1. 、波利尼西亚(Polynesia,
    Atitit trave islands list 旅游资源列表岛屿目录1. 东南亚著名的旅游岛屿 21.1. Cjkv 日韩 冲绳 琉球 济州岛 北海道 21.2. 中国 涠洲岛 南澳
    Atitit Major island groups and archipelagos 主要的岛群和群岛目录资料目录1. 岛群 波利尼西亚(Polynesia, 美拉尼西亚(Melanesia,
    Atitit glb 3tie city lst 三线城市列表 数据目录1. 全球范围内约90个城市 三线 12. 世界性三线城市全球共
    Atitit glb 1tie 2tie city lst 一二线城市列表数据约50个一线城市Alpha ++ 阿尔法++,,London 伦敦,,New York 纽约,,Alpha +
    Attit 现代编程语言重要特性目录第一章 类型系统 基本三大类型 2第一节 字符串 数字 bool 2第二节 推断局部变量 2第三节 动态类型 2第二章 可读性与开发效率 简单性 2
    Atitit 未来数据库新特性展望目录1. 统一的翻页 21.1. 2 Easy Top-N
    使用Chrome DevTools(console ande elements panel)进行xpath/css/js定位
    chrome -console妙用之定位xpath/js/css
    表达式树之构建Lambda表达式
  • 原文地址:https://www.cnblogs.com/LUA123/p/12050255.html
Copyright © 2011-2022 走看看