zoukankan      html  css  js  c++  java
  • CompletableFuture 专题

      @Bean("taskExecutor")   
      public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(3);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("taskExecutor-");
     
            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
            // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    CompletableFuture.supplyAsync(() -> doWorker(), taskExecutor)
                 .whenComplete((partitionProductVoList, e) -> {
                    log.info("complete");
                 }).exceptionally(e -> {
                     log.error("发生异常了", e.getMessage());
                     throw e;
                 });
    
     CompletableFuture[] completableFutures = partitionList.stream().map(() ->
             CompletableFuture.supplyAsync(() -> doWorker(),taskExecutor)
                  .whenComplete((v, e) -> {
                    log.info("complete");
             })).toArray(CompletableFuture[]::new);
     CompletableFuture.allOf(completableFutures).join();
    /**
     * @Auther: cheng.tang
     * @Date: 2019/3/2
     * @Description:
     */
    package com.tangcheng.learning.concurrent;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    
    import java.util.concurrent.*;
    import java.util.stream.Stream;
    
    /**
     * @Auther: cheng.tang
     * @Date: 2019/3/2
     * @Description:
     */
    @Slf4j
    public class CompletableFutureTest {
    
    
        /**
         * 16:12:01.109 [main] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - start
         * 16:12:01.168 [main] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - 小于coreSize时,每次新建线程,都会打印这个日志。Runtime.getRuntime().availableProcessors():8
         * 16:12:01.171 [main] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - 小于coreSize时,每次新建线程,都会打印这个日志。Runtime.getRuntime().availableProcessors():8
         * 16:12:01.172 [main] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - 小于coreSize时,每次新建线程,都会打印这个日志。Runtime.getRuntime().availableProcessors():8
         * 16:12:03.172 [Thread-2] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - finish :3
         * 16:12:04.172 [Thread-0] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - finish :1
         * 16:12:05.172 [Thread-1] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - finish :2
         * 16:12:05.172 [main] INFO com.tangcheng.learning.concurrent.CompletableFutureTest - last log
         */
        @Test
        public void testCompletableFuture() {
    
            log.info("start");
            /**
             * runSync不要求有返回值
             */
            CompletableFuture[] completableFutures = Stream.of(1, 2, 3).map(item -> CompletableFuture.supplyAsync(() -> {
                try {
                    int timeout = ThreadLocalRandom.current().nextInt(1, 5);
                    if (timeout == 1) {
                        throw new IllegalArgumentException("出错了,为什么是1");
                    }
                    TimeUnit.SECONDS.sleep(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("finish :{}", item);
                return "finish" + item;
            }, executor).exceptionally(ex -> {
                        log.error("exceptionally 一个任务失败了 item:{},{}", item, ex.getMessage());
                        return "一个任务失败了" + ex.getMessage();
                    }
            )).toArray(CompletableFuture[]::new);
            //allOf():工厂方法接受由CompletableFuture对象构成的数组,数组中所有的CompletableFuture完成后它返回一个CompletableFuture<Void>对象。
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
            //等待所有的子线程执行完毕
            voidCompletableFuture.join();
            log.info("last log ");
        }
    
        private static final Executor executor = Executors.newFixedThreadPool(Math.min(Runtime.getRuntime().availableProcessors(), 100), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                log.info("小于coreSize时,每次新建线程,都会打印这个日志。Runtime.getRuntime().availableProcessors():{}", Runtime.getRuntime().availableProcessors());
                Thread t = new Thread(r);
                // 使用守护线程---这种方式不会阻止程序关掉
                t.setDaemon(true);
                return t;
            }
        });
    
    }



    Future接口的局限性

    Future提供了isDone()方法检测异步计算是否已经结束,get()方法等待异步操作结束,以及获取计算的结果。但是这些操作依然很难实现:等到所有Future任务完成,通知线程获取结果并合并。下面就一起来看看JDK8中新引入的CompletableFuture。

    使用CompletableFuture构建异步应用

    CompletableFuture实现了Future接口,它的complete()方法就相当于结束CompletableFuture对象的执行,并设置变量的值。

    /**
     * @author yangfan
     * @date 2017/04/11
     */
    public class Shop {
    
        private static final Random random = new Random();
    
        private String name;
    
        public Shop(String name) {
            this.name = name;
        }
    
    
        public String getPrice(String product) {
            double price =  calculatePrice(product);
            Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
            return String.format("%s:%.2f:%s", name, price, code);
        }
    
        public Future<Double> getPriceAsync1(String product) {
            // 创建CompletableFuture对象,它会包含计算的结果
            CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    
            // 在另一个线程中以异步方式执行计算
            new Thread(() -> {
                try {
                    double price = calculatePrice(product);
                    //  需长时间计算的任务结束并得出结果时,设置Future的返回值
                    futurePrice.complete(price);
                } catch (Exception e) {
                    //异常处理
                    futurePrice.completeExceptionally(e);
                }
            }).start();
    
            // 无需等待还没结束的计算,直接返回Future对象
            return futurePrice;
    
        }
    
        public Future<Double> getPriceAsync(String product) {
            return CompletableFuture.supplyAsync(() -> calculatePrice(product));
        }
    
    
        private double calculatePrice(String product) {
            randomDelay();
            return random.nextDouble() * product.charAt(0) + product.charAt(1);
        }
    
        public static void delay() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
        public static void randomDelay() {
            int delay = 500 + random.nextInt(2000);
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        public static void main(String[] args) {
            Shop shop = new Shop("BestShop");
            long start = System.nanoTime();
            // 查询商店,试图去的商品的价格
            Future<Double> futurePrice = shop.getPriceAsync("my favorte product");
            long invocationTime = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Invocation returned after " + invocationTime + " msecs");
    
            // 执行更多任务,比如查询其他商店
            //doSomethingElse();
            // 在计算商品价格的同时
    
    
            try {
                // 从Future对象中读取价格,如果价格未知,会发生阻塞
                double price = futurePrice.get();
                System.out.printf("Price is %.2f%n", price);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    
            long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
            System.out.println("Price returned after " + retrievalTime + " msecs");
    
    
    
        }
    
        public String getName() {
            return name;
        }
    }

    上面的例子来自于《Java8实战》,模拟了一个耗时的操作,然后通过CompletableFuture包装成异步方法进行调用。注意代码里演示了两种方式,一种自己new一个线程再调用complete方法,还有就是用CompletableFuture自身提供的工厂方法,CompletableFuture.supplyAsync,它能更容易地完成整个流程,还不用担心实现的细节。

    现在看来好像和Future方式也没有什么区别,都是包装一下最后通过get()方法获取结果,但是CompletableFuture配合Java8用起来就非常厉害了,它提供了很多方便的方法,下面进行一个演示。

    同样是价格查询,我们现在接到一个需求,就是获取一个商品在不同商店的报价,一般来说用传统的方式就是写一个for循环,遍历商店然后获取价格,要想效率快一点我们也可以用并行流的方式来查询,但是并行流返回的结果是无序的。下面再将异步也引入,我们可以实现有序的并行操作:

    private static List<String> findPrices_1(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)).collect(Collectors.toList());
        return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    这里创建CompletableFuture和调用join()方法是两个不同的流是有原因的,如果只在一个流里,那么就没有异步的效果了,下一个Future必须等到上一个完成后才会被创建和执行。上面的代码执行效率并不会比并行流的效率差。

    默认情况下,并行流和CompletableFuture默认都是用固定数目的线程,都是取决于Runtime. getRuntime().availableProcessors()的返回值。并行流的线程池并不好控制,其本质是内部隐含使用了ForkJoinPool线程池,最大并发数可以通过系统变量设置。所以CompletableFuture也就具有了优势,它允许配置自定义的线程池,这也可以为实际应用程序带来性能上的提升(并行流无法提供的API),CompletableFuture.supplyAsync(Supplier supplier,Executor executor)提供了重载的方法来指定执行器使用自定义的线程池。

    // 创建一个线程池,线程池中线程的数目为100和商店数目二者中较小的一个值
    private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            // 使用守护线程---这种方式不会阻止程序关掉
            t.setDaemon(true);
            return t;
        }
    });

    并行–使用流还是CompletableFutures?

    现在我们知道对集合进行并行计算有两种方式:

    1. 转化为并行流,利用map开展工作。
    2. 取出每一个元素,创建线程,在CompletableFuture内对其进行操作

    后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助我们确保整体的计算不会因为线程都在等待I/O而发生阻塞。

    那么如何选择呢,建议如下:

    • 进行计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
    • 如果并行操作设计等到I/O的操作(网络连接,请求等),那么使用CompletableFuture灵活性更好,通过控制线程数量来优化程序的运行。

    CompletableFuture还提供了了一些非常有用的操作例如,thenApply(),thenCompose(),thenCombine()等

    • thenApply()是操作完成后将结果传入进行转换
    • thenCompose()是对两个异步操作进行串联,第一个操作完成时,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出第二个CompletableFuture对象。
    • thenCombine()会异步执行两个CompletableFuture任务,然后等待它们计算出结果后再进行计算。
    private static List<String> findPrices(String product) {
    
        List<CompletableFuture<String>> priceFutures = shops.stream()
                // 以异步方式取得每个shop中指定产品的原始价格
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                // Quote对象存在时,对其返回值进行转换
                .map(future -> future.thenApply(Quote::parse))
                // 使用另一个异步任务构造期望的Future,申请折扣
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
                .collect(Collectors.toList());
        return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务都是由不同线程处理的,例如thenApplyAsync(),thenComposeAsync()等。

    最后看一段利用thenAccept()来使用异步计算结果的代码:

    // 这里演示获取最先返回的数据
    public static Stream<CompletableFuture<String>> findPricesStream(String product) {
        return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)));
    }
    
    public static void main(String[] args) {
    
        long start = System.nanoTime();
    //        System.out.println(findPrices("myPhone27S"));
        CompletableFuture[] futures = findPricesStream("myPhone27S")
                .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " +
                        ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
                .toArray(CompletableFuture[]::new);
    //        CompletableFuture.allOf(futures).join();
        CompletableFuture.anyOf(futures).join();
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");
    
    }

    这样就几乎无需等待findPricesStream的调用,实现了一个真正的异步方法。

    https://blog.csdn.net/qq_32331073/article/details/81503475

    2.运行一个简单的异步stage

    下面的例子解释了如何创建一个异步运行Runnable的stage。

    static void runAsyncExample() {
        CompletableFuture cf = CompletableFuture.runAsync(() -> {
            assertTrue(Thread.currentThread().isDaemon());
            randomSleep();
        });
        assertFalse(cf.isDone());
        sleepEnough();
        assertTrue(cf.isDone());
    }

    这个例子想要说明两个事情:

    1. CompletableFuture中以Async为结尾的方法将会异步执行
    2. 默认情况下(即指没有传入Executor的情况下),异步执行会使用ForkJoinPool实现,该线程池使用一个后台线程来执行Runnable任务。注意这只是特定于CompletableFuture实现,其它的CompletableStage实现可以重写该默认行为。

    https://segmentfault.com/a/1190000013452165?utm_source=index-hottest

    联合多个CompletableFuture

    另外一组方法允许将多个CF联合在一起。我们已经看见过静态方法allOf,当其所有的组件均完成时,它就会处于完成状态,与之对应的方法也就是anyOf,返回值同样是void,当其任意一个组件完成时,它就会完成。
    除了这两个方法以外,这个组中其他的方法都是实例方法,它们能够将receiver按照某种方式与另外一个CF联合在一起,然后将结果传递到给定的函数中。

    https://www.cnblogs.com/kexianting/p/8692437.html

    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    /**
     * 最佳价格查询器
     */
    public class BestFinder {
    
        List<Shop> shops = Arrays.asList(
                new Shop("A"),
                new Shop("B"),
                new Shop("C"),
                new Shop("D"),
                new Shop("E"),
                new Shop("F"),
                new Shop("G"),
                new Shop("H"),
                new Shop("I"),
                new Shop("J")
        );
    
        public void findPricesContinue(String product){
            long st = System.currentTimeMillis();
            Stream<CompletableFuture<String>> futurePrices = shops.stream()
                    //首先异步获取价格
                    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceFormat(product),myExecutor))
                    //将获取的字符串解析成对象
                    .map(future -> future.thenApply(Quote::parse))
                    //使用另一个异步任务有获取折扣价格
                    .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),myExecutor)));
            //thenAccept()会在CompletableFuture完成之后使用他的返回值,这里会持续执行子线程
            CompletableFuture[] futures = futurePrices.map(f -> f.thenAccept(s -> {
                                                                                String sout = String.format("%s done in %s mesc",s,(System.currentTimeMillis() - st));
                                                                                System.out.println(sout);
                                                                             }))
                                                      .toArray(size -> new CompletableFuture[size]);
            //allOf()工厂方法接受由CompletableFuture对象构成的数组,这里使用其等待所有的子线程执行完毕
            CompletableFuture.allOf(futures).join();
        }
    
      
    
        /**
         * 异步查询
         * 相比并行流的话CompletableFuture更有优势:可以对执行器配置,设置线程池大小
         */
        @SuppressWarnings("all")
        private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                //使用守护线程保证不会阻止程序的关停
                t.setDaemon(true);
                return t;
            }
        });
       

    https://segmentfault.com/a/1190000012859835

    https://segmentfault.com/a/1190000013452165?utm_source=index-hottest

    有示例代码:
    https://blog.csdn.net/G0_hw/article/details/82919651




  • 相关阅读:
    java Metaspace频繁FGC问题定位(转载)
    JVM内存模型详解(转载)
    56. Merge Intervals
    begin again
    55. Jump Game
    54. Spiral Matrix
    53. Maximum Subarray
    52. N-Queens II
    51. N-Queens
    1. 赋值运算符函数
  • 原文地址:https://www.cnblogs.com/softidea/p/10461612.html
Copyright © 2011-2022 走看看