zoukankan      html  css  js  c++  java
  • Java8函数之旅 (八)

    前言

    随着多核处理器的出现,如何轻松高效的进行异步编程变得愈发重要,我们看看在java8之前,使用java语言完成异步编程有哪些方案。

    JAVA8之前的异步编程

    • 继承Thead类,重写run方法
    • 实现runable接口,实现run方法
    • 匿名内部类编写thread或者实现runable的类,当然在java8中可以用lambda表达式简化
    • 使用futureTask进行附带返回值的异步编程
    • 使用线程池和Future来实现异步编程
    • spring框架下的@async获得异步编程支持

    使用线程池与future来实现异步编程

    实现方式可谓是多种多样,这里我们使用线程池和future来实现异步编程,借着这个例子来讲述java8的组合式异步编程有着怎样的优势

            //构造线程池
            ExecutorService pool = Executors.newCachedThreadPool();
            try {
                //构造future结果,doSomethingA十分耗时,因此采用异步
                Future<Integer> future = pool.submit(() -> doSomethingA());
                //做一些别的事情
                doSomethingB();
                //从future中获得结果,设置超时时间,超过了就抛异常
                Integer result = future.get(10, TimeUnit.SECONDS);
                //打印结果
                System.out.printf("the async result is : %d", result);
                //异常捕获
            } catch (InterruptedException e) {
                System.out.println("任务计算抛出了一个异常!");
            } catch (ExecutionException e) {
                System.out.println("线程在等待的过程中被中断了!");
            } catch (TimeoutException e) {
                System.out.println("future对象等待时间超时了!");
            }
        }
    

    然而这样的异步编程方式仅仅能满足基本的需要,稍微复杂的一些异步处理Future接口似乎就有点束手无策了,例如

    • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第
      一个的结果。
    • 等待 Future 集合中的所有任务都完成。
    • 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同
      一个值) ,并返回它的结果。
    • 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式) 。
    • 应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)

    这种感觉其实就很像没有stream之前的collections的操作感觉一样,同样的,对于future,java8提供了它的函数式升级版本CompletableFuture,从名字就可以看出来这绝对是future的升级版。

    JAVA8中的组合式异步编程

    使用CompletableFuture进行异步编程

    事物的发展往往都是由简单->复杂->简单,这里我们同样遵循这样的规律,循序渐进。
    下面的例子摘取《java8实战》的异步编程章节,并做了简化。
    我们假设现在我们有一项查询商品价格的服务十分耗时,所以毫无例外的我们想让查询最佳价格的服务以异步的形式执行。
    最直接的方式是直接构建一个异步查询商品价格的api,并且返回,为了演示需要,编写一个线程等待一秒的方法来模拟长时间的请求。

    
        public double getPrice(String product) {
            return calculatePrice(product);
        }
    
        private double calculatePrice(String product) {
            delay();
            return random.nextDouble() * product.charAt(0) + product.charAt(1);
        }
        
        public static void delay() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    

    现在getPrice这方法是一个同步的方法,该方法在经过1秒的延迟之后会返回给我们一个商品的价格(这里只是简单的根据名字构造了一个随机数)

    我们使用completFuture将getPrice转化为异步方法,如下

        public Future<Double> getAsyncPrice(String product) {
            CompletableFuture<Double> futurePrice = new CompletableFuture<>();
            new Thread(() -> {
                double price = calculatePrice(product);
                futurePrice.complete(price);
            }).start();;
            return futurePrice;
        }
    

    这里构造一个completableFuture对象,并另起一个异步线程,将异步计算的结果使用futurePrice.complete来接受,无需等待直接返回future结果
    调用类使用Integer result = future.get(10, TimeUnit.SECONDS)来接受返回的结果,如果等待超时则抛出异常。
    另外,如果异步线程发生异常,并且在排查问题的时候想要知道具体是什么原因导致的,
    可以在getAsyncPrice方法中使用completeExcepitonally来得到异常信息并且结束这次异步任务,代码如下

    public Future<Double> getPriceAsync(String product) {
        public Future<Double> getPriceAsync(String product) {
            CompletableFuture<Double> futurePrice = new CompletableFuture<>();
            new Thread(() -> {
                try {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);
                } catch (Exception ex) {
                    futurePrice.completeExceptionally(ex);
                }
            }).start();
            return futurePrice;
        }
    

    这样,基本的功能就实现了。

    使用工厂类简化异步操作

    也许你看到上面的代码,会说:"我晕,你这写法比原来还复杂哦,而且我也没看出啥区别啊。",是的,上文的写法可以算是原生态的写法了,目的为为下面的知识做一个简单的铺垫。
    事实上,CompleteFuture本身提供了大量的工厂方法来供我们十分方便的实现一个异步编程,他封装了前篇一律的异常与结果接收,你只需要编写真正的异步逻辑部分就可以了,同时借住于lambda表达式,可以更进一步。

    supplyAsync 方法接受一个生产者(Supplier)作为参数,返回一个 CompletableFuture
    对象, 该对象完成异步执行后会读取调用生产者方法的返回值。 生产者方法会交由 ForkJoinPool
    池中的某个执行线程(Executor)运行,但是你也可以使用 supplyAsync 方法的重载版本,传
    递第二个参数指定不同的执行线程执行生产者方法。

    于是上文的例子可以改写如下

        public Future<Double> getPriceAsync(String product) {
            return CompletableFuture.supplyAsync(() -> calculatePrice(product));
        }
    

    是不是简洁了许多呢?

    可现在还有问题,这里我们成功的编写了一个十分简洁的异步方法,可实际的情况中,我们所能调用的API大部分都是同步的,因此下面将介绍如何使用异步的方法去操作这些同步API。

    使用流异步操作同步API

    我们现在有这么一个需求,它接受产品名作为参数,返回一个字符串列表,
    这个字符串列表中包括商店的名称、该商店中指定商品的价格,商店集合以及接口设计如下。

    List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
    new Shop("LetsSaveBig"),
    new Shop("MyFavoriteShop"),
    new Shop("BuyItAll"));
    
    public List<String> findPrices(String product);
    

    使用同步的方法实现

    这样的集合变换使用stream流来操作十分容易,代码如下

        public List<String> findPrices(String product) {
            return shops.stream()
                    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                    .collect(toList());
        }
    

    stream流将 shop映射为了shop的名称以及该shop中商品的价格的字符串,并使用收集器进行收集。

    使用异步的方法实现

    事实上,我们完全可以使用流将shop映射成CompletableFuture对象,就好像在操作集合一样,代码如下

        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture
                        .supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
                .collect(toList());
    

    使用这种方式,你会得到一个,List<CompletableFuture>,列表中的每个CompletableFuture 对象在计算完成后都包含商店的String类型的名称。但是,由于你用CompletableFutures 实现的findPrices方法要求返回一个List,你需要等待所有的future 执行完毕,将其包含的值抽取出来,填充到列表中才能返回。

    为了实现这个效果,你可以向最初的 List<CompletableFuture> 施加第二个map 操作,对 List 中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future 接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try / catch 语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。所有这些整合在一起,你就可以重新实现 findPrices 了,具体代码如下

        public List<String> findPrices(String product) {
            List<CompletableFuture<String>> priceFutures = shops.stream()
                    .map(
                        shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
                    .collect(Collectors.toList());
                    
            return priceFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(toList());
        }
    

    以上的代码你可能会疑惑,为什么不直接按照shop->completableFuture->join->collect
    的方式进行流处理呢?那是因为join这一步本身是阻塞的,对于流操作来说,前一个shop没有处理完之前,是不会处理下一个shop的,所以对于每一个shop,处理到join这一步的时候就会阻塞住等待1秒,这样的话,这个流水线本身就会变回阻塞的了。

    而上文的编写方法可以看出 shop->completableFuture->collect 这个操作本身是非阻塞的,顺利的将所有的请求都发出去了,随后再使用join来完成结果的收集。

    使用线程池来管控异步方法

    前面在介绍工厂方法时提到,可以选择第二参数放入一个线程池来进行管控。

       private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            }
        });
    

    接着在supplyAsync中使用该线程池即可,代码如下

    CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
    shop.getPrice(product), executor);
    

    进阶的异步流操作

    既然我们已经将异步操作与流相结合了,因此很容易的就会想到对于异步流来说,应该有会有类似于集合流的一些非常好用的API吧?事实上,JAVA8的确为我们提供了这些API。

    构造同步和异步操作

    如同集合流操作一样,异步流也可以提前安排一系列的任务,然后让异步任务有条不紊的按照这个顺序去执行。

    • 同步任务
      使用future.thenApply(Function)来实现,该方法接受一个Function对象
      你可以规划这样的任务 任务A(异步)->任务B(同步),语法可能是这样的
        stream()
        .map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
        .map(future->future.thenApply(任务B)//执行同步的任务B
        .collect
    
    • 异步任务
      与同步几乎一样,方法变为future.thenCompose(Function)
      你可以规划这样的任务 任务A(异步)->任务B(同步)->任务C(异步),语法可能是这样的
        stream()
        .map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
        .map(future->future.thenApply(任务B)//执行同步的任务B
        .map(future->future.thenCompose(任务C))//再异步执行任务C
        .collect
    

    将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖

    使用thenCombine来完成,类似任务A与任务B,A是查询价格,B是查询汇率,这两个任务之间本身没有关联关系,所以可以同时发起,但你最后需要计算价格乘以汇率,因此在这两个任务完成之后需要对他们的结果进行合并,代码如下

            Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任务A
                    .thenCombine(
                         CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任务B
                        (price, rate) -> price * rate); //任务A与任务B的合并操作
    

    注意这里的任务A与任务B是异步的,但他们的合并操作是同步的,如果想要合并操作也是异步的,使用future.thenCombineAsync的异步方法版本。

    对结果进行处理

    使用thenAccept(Consumer)
    以上都是对结果进行一些映射,你现在要对结果只进行处理,说白了就是前面的都是Function,现在要换成consumer,并且参数不再是异步任务,而是任务的结果值,举个例子,上文的任务A(异步)->任务B(同步)->任务C(异步) 的操作现在想到对他们的操作结果进行打印
    就可以使用thenAccept(Consumer)

        stream()
        .map(xxx->supplayAsync(()->任务A)) //这一步已经异步的映射成了任务A
        .map(future->future.thenApply(任务B)//执行同步的任务B
        .map(future->future.thenCompose(任务C))//再异步执行任务C
        .map(future->future.thenAccept(System.out::println))//将结果打印
        .collect
    

    使用allOf与anyOf对结果进行处理

    需要注意的是在执行了Accpet方法之后,你得到的是一个 CompletableFuture流对象
    你可以对这些流对象进行类似及早求值的操作,例如这条查询4个商家的价格服务只要有一个给出了返回结果就结束这次异步流。

    CompletableFuture[] futures = findPricesStream("myPhone")
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]);
    CompletableFuture.anyOf(futures).join();
    

    allOf 工厂方法接收一个由 CompletableFuture 构成的数组,数组中的所有 Completable-Future 对象执行完成之后,它返回一个 CompletableFuture 对象。这意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 对象执行完毕,对 allOf 方法返回的CompletableFuture 执行 join 操作是个不错的主意。这个方法对“最佳价格查询器”应用也是有用的,因为你的用户可能会困惑是否后面还有一些价格没有返回,使用这个方法,你可以在执行完毕之后打印输出一条消息“All shops returned results or timed out” 。然而在另一些场景中,你可能希望只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法 anyOf 。该方法接收一个 CompletableFuture 对象构成的数组, 返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 Completable-Future

    总结

    本文是对Java8实战中异步编程章节的一些整理和汇总,介绍了利用新增的completableFuture将异步任务与流操作集合起来实现组合式异步编程,利用工厂方法与函数接口可以大大的简化代码,同时提高代码的可阅读性,想要查看详细,可以自行翻阅该书。

  • 相关阅读:
    HttpRunner3.X
    基于C++的ByteBuf封装
    关于matlab的配色
    关于样本方差的无偏估计
    使用Python求解Nonogram
    菜鸡的一些力扣记录
    LeetCode链表练习
    C语言中的链表
    Python中的链表简介
    Nebula Graph 源码解读系列 | Vol.03 Planner 的实现
  • 原文地址:https://www.cnblogs.com/invoker-/p/8167995.html
  • Copyright © 2011-2022 走看看