zoukankan      html  css  js  c++  java
  • java8-CompleableFuture的使用1

    背景

    1. 硬件的极速发展,多核心CPU司空见惯;分布式的软件架构司空见惯;
    2. 功能API大多采用混聚的方式把基础服务的内容链接在一起,方便用户生活。

    抛出了两个问题:

    1. 如何发挥多核能力;
    2. 切分大型任务,让每个子任务并行运行;

    并发和并行的区别

    项目 区别1 实现技术
    并行 每个任务跑在单独的cpu核心上 分支合并框架,并行流
    并发 不同任务共享cpu核心,基于时间片调度 CompletableFuture

    Future接口

    java5开始引入。将来某个时刻发生的事情进行建模。
    进行一个异步计算,返回一个执行运算的结果引用,当运算结束后,这个引用可以返回给调用方。
    可以使用Future把哪些潜在耗时的任务放到异步线程中,让主线程继续执行其他有价值的工作,不在白白等待。

    下面是一个例子:使用Future,可以让两个任务并发的运行,然后汇聚结果;

    package com.test.completable;
    
    import com.google.common.base.Stopwatch;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 说明:Future应用实例
     * @author carter
     * 创建时间: 2019年11月18日 10:53
     **/
    
    public class FutureTest {
    
        static final ExecutorService pool = Executors.newFixedThreadPool(2);
    
    
        public static void main(String[] args) {
            Stopwatch stopwatch = Stopwatch.createStarted();
    
            Future<Long> longFuture = pool.submit(() -> doSomethingLongTime());
    
            doSomething2();
            try {
                final Long longValue = longFuture.get(3, TimeUnit.SECONDS);
                System.out.println(Thread.currentThread().getName() + " future return value :" + longValue + " : " + stopwatch.stop());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            pool.shutdown();
        }
    
        private static void doSomething2() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + " doSomething2 :" + stopwatch.stop());
        }
    
        private static Long doSomethingLongTime() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(Thread.currentThread().getName() + " doSomethingLongTime : " + stopwatch.stop());
            return 1000L;
        }
    
    
    }
    
    

    没法编写简介的并发代码。描叙能力不够;比如如下场景:

    1. 将两个异步计算的结果合并为一个,这两个异步计算之间互相独立,但是第二个有依赖第一个结果。
    2. 等待Future中所有的任务都完成;
    3. 仅等待Future集合中最快结束的任务完成,并返回它的结果;
    4. 通过编程的方式完成一个Future任务的执行;
    5. 响应Future的完成事件。

    基于这个缺陷,java8中引入了CompletableFuture 类;

    实现异步API

    技能点:

    1. 提供异步API;
    2. 修改同步的API为异步的API,如何使用流水线把两个任务合并为一个异步计算操作;
    3. 响应式的方式处理异步操作的完成事件;
    类型 区别 是否堵塞
    同步API 调用方在被调用运行的过程中等待,被调用方运行结束后返回,调用方取得返回值后继续运行 堵塞
    异步API 调用方和被调用方是异步的,调用方不用等待被调用方返回结果 非堵塞
    package com.test.completable;
    
    import com.google.common.base.Stopwatch;
    import com.google.common.base.Ticker;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 说明:异步调用计算价格的方法
     * @author carter
     * 创建时间: 2019年11月18日 13:32
     **/
    
    public class Test {
    
        public static void main(String[] args) {
            Shop shop = new Shop("BestShop");
    
            Stopwatch stopwatch = Stopwatch.createStarted();
            Stopwatch stopwatch2 = Stopwatch.createStarted();
    
            Future<Double> doubleFuture = shop.getPriceFuture("pizza");
    
            System.out.println("getPriceFuture return after: " + stopwatch.stop());
    
            doSomethingElse();
            try{
                final Double price = doubleFuture.get();
                System.out.println("price is " + price + " return after: " + stopwatch2.stop());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        private static void doSomethingElse() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            DelayUtil.delay();
            System.out.println("doSomethingElse " + stopwatch.stop());
    
        }
    
    
    }
    
    

    错误处理

    如果计算价格的方法产生了错误,提示错误的异常会被现在在试图计算商品价格的当前线程的范围内,最终计算的异步线程会被杀死,这会导致get方法返回结果的客户端永久的被等待。

    如何避免异常被掩盖, completeExceptionally会把CompletableFuture内发生的问题抛出去。

    
        private static void test2() {
            Shop shop = new Shop("BestShop");
    
            Stopwatch stopwatch = Stopwatch.createStarted();
            Stopwatch stopwatch2 = Stopwatch.createStarted();
    
            Future<Double> doubleFuture = shop.getPriceFutureException("pizza");
    
            System.out.println("getPriceFuture return after: " + stopwatch.stop());
    
            doSomethingElse();
            try{
                final Double price = doubleFuture.get();
                System.out.println("price is " + price + " return after: " + stopwatch2.stop());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    方法改造:

    //异步方式查询产品价格,异常抛出去
        public Future<Double> getPriceFutureException(String product){
    
    
            final CompletableFuture<Double> doubleCompletableFuture = new CompletableFuture<>();
    
            new Thread(()->{try {
                doubleCompletableFuture.complete(alculatePriceException(product));
            }catch (Exception ex){
                doubleCompletableFuture.completeExceptionally(ex);
            }
            }).start();
    
            return doubleCompletableFuture;
        }
    

    无堵塞

    即让多个线程去异步并行或者并发的执行任务,计算完之后汇聚结果;

    
        private static void test3(String productName) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)))
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test3 done in  " + stopwatch.stop());
    
    
        }
    
        private static void test3_parallelStream(String productName) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .parallel()
                    .map(item -> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)))
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test3_parallelStream done in  " + stopwatch.stop());
    
    
        }
    
    
        private static void test3_completableFuture(String productName) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName))))
                    .collect(Collectors.toList())
                    .stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test3_completableFuture done in  " + stopwatch.stop());
    
    
        }
    
    
    
        private static void test3_completableFuture_pool(String productName) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .map(item ->CompletableFuture.supplyAsync(()-> String.format("商店:%s的商品:%s 售价是:%s", item.getName(), productName, item.getPrice(productName)),pool))
                    .collect(Collectors.toList())
                    .stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test3_completableFuture done in  " + stopwatch.stop());
    
    
        }
    

    代码中有一个简单的计算场景,我想查询4家商店的iphone11售价;

    华强北,益田苹果店,香港九龙城,京东商城;

    每一家的查询大概耗时1s;

    任务处理方式 耗时 优缺点说明
    顺序执行 4秒多 简单,好理解
    并行流 1秒多 无法定制流内置的线程池,使用简单,改造简单
    CompletableFuture 默认线程池 2秒多 默认线程池
    CompletableFuture 指定线程池 1秒多 指定了线程池,可定制性更好,相比于并行流

    多个异步任务的流水线操作

    场景: 先计算价格,在拿到折扣,最后计算折扣价格;

    
        
        private static void test4(String productName) {
    
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .map(shop->shop.getPrice_discount(productName))
                    .map(Quote::parse)
                    .map(DisCount::applyDiscount)
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test4 done in  " + stopwatch.stop());
    
    
        }
    
        private static void test4_completableFuture(String productName) {
    
            Stopwatch stopwatch = Stopwatch.createStarted();
            final List<String> stringList = Stream.of(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"))
                    .map(shop->CompletableFuture.supplyAsync(()->shop.getPrice_discount(productName),pool))
                    .map(future->future.thenApply( Quote::parse))
                    .map(future->future.thenCompose(quote -> CompletableFuture.supplyAsync(()->DisCount.applyDiscount(quote),pool)))
                    .collect(Collectors.toList())
                    .stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
    
            System.out.println(stringList);
            System.out.println("test4_completableFuture done in  " + stopwatch.stop());
    
    
        }
    

    以上是有依赖关系的两个任务的聚合,即任务2,依赖任务1的结果。使用的是thenCompose方法;

    接下来如果有两个任务可以异步执行,最后需要依赖着两个任务的结果计算得到最终结果,采用的是thenCombine;

    //两个不同的任务,最后需要汇聚结果,采用combine
        private static void test5(String productName) {
    
            Stopwatch stopwatch = Stopwatch.createStarted();
    
    
            Shop shop = new Shop("香港九龙");
    
          Double pricefinal =  CompletableFuture.supplyAsync(()->shop.getPrice(productName))
                    .thenCombine(CompletableFuture.supplyAsync(shop::getRate),(price, rate)->price * rate).join();
    
    
            System.out.println("test4 done in  " + stopwatch.stop());
    
    
        }
    

    completion事件

    让任务尽快结束,无需等待;
    有多个服务来源,你请求多个,谁先返回,就先响应;

    结果依次返回:

     //等待所有的任务执行完毕; CompletableFuture.allOf()
        public void findPriceStream(String productName){
            List<Shop> shops = Arrays.asList(new Shop("华强北"), new Shop("益田假日广场"), new Shop("香港九龙城"), new Shop("京东商城"));
            final CompletableFuture[] completableFutureArray = shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice_discount(productName), pool))
                    .map(future -> future.thenApply(Quote::parse))
                    .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> DisCount.applyDiscount(quote), pool)))
                    .map(f -> f.thenAccept(System.out::println))
                    .toArray(size -> new CompletableFuture[size]);
    
    
            CompletableFuture.allOf(completableFutureArray).join();
    
        }
    

    多个来源获取最快的结果:

    //有两个获取天气的途径,哪个快最后结果就取哪一个
        public static void getWeather(){
            final Object join = CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> a_weather()), CompletableFuture.supplyAsync(() -> b_weather())).join();
    
            System.out.println(join);
        }
    
        private static String b_weather() {
            DelayUtil.delay(3);
            return "bWeather";
        }
    
        private static String a_weather() {
            DelayUtil.delay(5);
            return "aWeather";
        }
    

    源码分析

    可完备化的将来;CompletableFuture ;

    先看签名:

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
    
    

    实现了Futrue,CompletionStage接口;
    这两个接口简单说明一下:

    接口 关键特性
    Future 直接翻译为未来,标识把一个任务异步执行,需要的的时候,通过get方法获取,也可以取消cancel,此外还提供了状态查询方法,isDone, isCancled,实现类是FutureTask
    CompletionStage 直接翻译是完成的阶段,提供了函数式编程方法

    可以分为如下几类方法

    方法 说明
    thenApply(Function f) 当前阶段正常完成之后,返回一个新的阶段,新的阶段把当前阶段的结果作为参数输入;
    thenConsume(Consumer c), 当前阶段完成之后,结果作为参数输入,直接消费掉,得到不返回结果的完成阶段;
    thenRun(Runnable action), 不接受参数,只是继续执行任务,得到一个新的完成阶段;
    thenCombine(otherCompletionStage,BiFunction), 当两个完成阶段都完成的时候,执行BIFunction,返回一个新的阶段;
    thenAcceptBoth(OtherCompletionStage, BiConsumer) 两个完成阶段都完成之后,对两个结果进行消费;
    runAfterBoth(OtherCompletionStage,Runable) 两个完成阶段都完成之后,执行一个动作;
    applyToEither(OtherCompletionStage,Function) 两个完成阶段的任何一个执行结束,进入函数操作,并返回一个新的阶段
    acceptEither(OtherCompletionStage,Consumer) 两个完成阶段的任何一个执行结束,消费掉,返回一个空返回值的完成阶段
    runAfterEither(OtherCompletionStage,Runable) 两个完成阶段的任何一个结束,执行一个动作,返回一个空返回值的完成阶段
    thenCompose(Function) 当前阶段完成,返回值作为参数,进行函数运算,然后结果作为一个新的完成阶段
    exceptionally(Function) 无论当前阶段是否正常完成,消费掉异常,然后返回值作为一个新的完成阶段
    whenComplete
    handle 无论当前完成阶段是否正常结束,都执行一个BIFunction的函数,并返回一个新结果作为一个新的完成阶段
    toCompletableFuture 转换为ComplatableFuture

    里面的实现细节后面单独成文章再讲。

    小结

    1. 执行一些比较耗时的操作,尤其是依赖一个或者多个远程服务的操作,可以使用异步任务改善程序的性能,加快程序的响应速度;
    2. 使用CompletableFuture你可以轻松的实现异步API;
    3. CompletableFuture提供了异常管理机制,让主线程有机会接管子任务抛出的异常;
    4. 把同步API封装到CompletableFuture中,可以异步得到它的结果;
    5. 如果异步任务之间互相独立,而他们之间的某一些结果是另外一些的输入,可以把这些任务进行compose;
    6. 可以为CompletableFuture中的任务注册一个回调函数,当任务执行完毕之后再进行一些其它操作;
    7. 你可以决定什么时候结束程序的运行,是所有的CompletableFuture任务所有对象执行完毕,或者只要其中任何一个完成即可。

    原创不易,转载请注明出处。

  • 相关阅读:
    day21_map&debug
    Error: A JNI error has occurred, please check your installation and try again
    day20_比较器&Map
    day1819_List&Set&Genericity
    day17_Collection
    String类是不可改变的,所以你一旦创建了String对象,那它的值就无法改变了
    day15_api01
    day16_api02
    渡河问题
    leetcode 春季个人赛
  • 原文地址:https://www.cnblogs.com/snidget/p/11892097.html
Copyright © 2011-2022 走看看