zoukankan      html  css  js  c++  java
  • jdk8之CompletableFuture与CompletionService

      JDK 8的CompletionService相对于之前版本的Future而言,其优势是能够尽可能快的得到执行完成的任务。例如有4个并发任务要执行,正常情况下通过Future.get()获取,通常只能按照提交的顺序获得结果,如果最后提交的最先完成的话,总执行时间会长很多。而通过CompletionService能够降低总执行时间,如下所示:

    package com.hundsun.ta.base.service;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @author zjhua
     * @description
     * @date 2020/1/28 21:07
     */
    public class CompletionServiceTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            testFuture();
            testCompletionService();
        }
    
        //结果的输出和线程的放入顺序 有关(如果前面的没完成,就算后面的哪个完成了也得等到你的牌号才能输出!),so阻塞耗时
        public static void testFuture() throws InterruptedException, ExecutionException {
            long beg = System.currentTimeMillis();
            System.out.println("testFuture()开始执行:" + beg);
            ExecutorService executor = Executors.newCachedThreadPool();
            List<Future<String>> result = new ArrayList<Future<String>>();
            for (int i = 5; i > 0; i--) {
                Future<String> submit = executor.submit(new Task(i));
                result.add(submit);
            }
            executor.shutdown();
            for (int i = 0; i < 5; i++) {//一个一个等待返回结果
                Thread.sleep(500);
                System.out.println("线程" + i + "执行完成:" + result.get(i).get());
            }
            System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis()-beg));
        }
    
        //结果的输出和线程的放入顺序 无关(谁完成了谁就先输出!主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序),so很大大缩短等待时间
        private static void testCompletionService() throws InterruptedException, ExecutionException {
            long beg = System.currentTimeMillis();
            System.out.println("testFuture()开始执行:" + beg);
            ExecutorService executor = Executors.newCachedThreadPool();
            ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
            for (int i = 5; i > 0; i--) {
                completionService.submit(new Task(i));
            }
            executor.shutdown();
            for (int i = 0; i < 5; i++) {
                // 检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
                Future<String> future = completionService.take(); //这一行没有完成的任务就阻塞
                Thread.sleep(500);
                System.out.println("线程" + i + "执行完成:" + future.get());   // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务
            }
            System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));
        }
    
        private static class Task implements Callable<String> {
    
            private volatile int i;
    
            public Task(int i) {
                this.i = i;
            }
    
            @Override
            public String call() throws Exception {
                Thread.sleep(i*500);
                return "任务 : " + i;
            }
    
        }
    }
    // 执行结果
    testFuture()开始执行:1580217876088
    线程0执行完成:任务 : 5
    线程1执行完成:任务 : 4
    线程2执行完成:任务 : 3
    线程3执行完成:任务 : 2
    线程4执行完成:任务 : 1
    testFuture()执行完成:1580217880596,4508
    testFuture()开始执行:1580217880596
    线程0执行完成:任务 : 1
    线程1执行完成:任务 : 2
    线程2执行完成:任务 : 3
    线程3执行完成:任务 : 4
    线程4执行完成:任务 : 5
    testFuture()执行完成:1580217883605,3009

    使用传统的Future,需要执行4.5秒,使用CompleteService,则只需要3秒。但是如果子线程执行完成后不需要执行其他任务,则意义不是很大。

    除了上述场景外,CompleteService还适合于N选1的场景,例如同时从两个渠道查询数据,返回任何一个可用的即可,从Future就实现不了。

    CompletionService的定义如下:

    其实现也比较简单,利用了ThreadPoolExecutor。

    看完CompleteService,再来看CompleteFuture。它实现了Future接口和CompletionStage接口(他代表某个异步或同步计算的阶段,也就是计算流水线的一个节点,这样多个CompletionStage可以作为和过滤器一样链式执行,一个计算单元完成后出发下一个计算单元),和CompleteService的区别在于CompleteFuture知道当前完成的是谁,并采用编程式回调提高代码可读性,CompleteService只知道哪个最快完成了,具体是谁需要应用自己去关联上下文。同时在编程模式上,很大程度上利用了JDK 8的Lambda表达式,这样一个完整服务的多个步骤就能够和同步的的写法一样自然,不用为了实现异步处理而将逻辑合并为一个超大的方法。在并行处理中,如果每个分片的处理时间相差比较大,例如有些1分钟,有些3分钟,有些10秒钟,这样将每个服务的粒度细分为很多个子步骤,每个服务的子步骤通过CompleteFuture串联起来,整体的完成时间就能够下降,每个分片的处理完成时间也将趋于接近。同时在异常的处理上,CompleteFuture也要友好的多。

     下面来看一个例子:

    static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;
        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });
    static void thenApplyAsyncWithExecutorExample() {
    // 简单的异步执行 CompletableFuture
    <String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }

    异常处理:

    static void completeExceptionallyExample() {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
                CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
        cf.completeExceptionally(new RuntimeException("completed exceptionally"));  // 模拟抛出异常
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
        try {
            cf.join();
            fail("Should have thrown an exception");
        } catch(CompletionException ex) { // just for testing
            assertEquals("completed exceptionally", ex.getCause().getMessage());
        }
        assertEquals("message upon cancel", exceptionHandler.join());
    }

    链式调用:

    public void completableFutureApplyAsync() {
     ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
     ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
     CompletableFuture<Integer> completableFuture = 
     CompletableFuture
          .supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool
          .thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor
          .thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool
       Integer balance = completableFuture.join();
        assertEquals(Integer.valueOf(balance), Integer.valueOf(100));
        }

      就实际应用而言,CompletableFuture的作用更加有价值的地方在于其他的一些方法,比如allOf、anyOf、xxxToEither等需要多对一的场景,他们可以大大简化代码。

    参考:

    https://dzone.com/articles/20-examples-of-using-javas-completablefuture

  • 相关阅读:
    第四届蓝桥杯JavaC组国(决)赛真题
    第四届蓝桥杯JavaC组国(决)赛真题
    第四届蓝桥杯JavaC组国(决)赛真题
    第四届蓝桥杯C++B组国(决)赛真题
    第四届蓝桥杯C++B组国(决)赛真题
    第四届蓝桥杯C++B组国(决)赛真题
    第四届蓝桥杯C++B组国(决)赛真题
    Qt 动画快速入门(一)
    越败越战,愈挫愈勇(人生就像心电图,一帆风顺,你就挂了!)
    CSS 编码中超级有用的工具集合
  • 原文地址:https://www.cnblogs.com/zhjh256/p/11829397.html
Copyright © 2011-2022 走看看