zoukankan      html  css  js  c++  java
  • Java8 中增强 Future:CompletableFuture

    增强的 Future:CompletableFuture

    CompletableFuture(它实现了 Future 接口) 和 Future 一样,可以作为函数调用的契约。当你向它请求获得结果,如果数据还没有准备好,请求线程就会等待,直到数据准备好后返回。

    异步执行

    @Test 
    public void testFuture() throws ExecutionException, InterruptedException { 
        long t1 = System.currentTimeMillis(); 
        Future<List<Integer>> listFuture = getListAsync(); 
        System.out.println("do something..."); 
        Thread.sleep(2_000L); 
        System.out.println("getList..."); 
        System.out.println(listFuture.get()); 
        System.out.println("spendTime = " + (System.currentTimeMillis() - t1)); 
    } 
    private Future<List<Integer>> getListAsync() { 
        CompletableFuture<List<Integer>> resultFuture = new CompletableFuture<>(); 
        new Thread( () -> { 
            try { 
                Thread.sleep(3_000L); 
                resultFuture.complete(Lists.newArrayList(1, 2, 3)); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        }).start(); 
        return resultFuture; 
    }

    执行结果:

    do something... 
    getList... 
    [1, 2, 3] 
    spendTime = 3137

    以上代码中,do something 指主线程在调用过 Future 的异步接口取得凭证后,即可继续向下执行,直至 getList 需要通过凭证取得异步计算结果时,再通过 get 的方式取得。

    如果采用同步调用的方式,那么以上程序则需要 3 + 2 共 5s 的时间。

    使用 supplyAsync 创建 CompletableFuture

    CompletableFuture 提供了更轻巧的工厂方法

    @Test 
    public void testFuture() throws ExecutionException, InterruptedException { 
        long t1 = System.currentTimeMillis(); 
    //    Future<List<Integer>> listFuture = getListAsync(); 
        CompletableFuture<List<Integer>> listFuture = CompletableFuture.supplyAsync(OrderThriftServiceTest::getList); 
        System.out.println("do something..."); 
        Thread.sleep(2_000L); 
        System.out.println("getList..."); 
        System.out.println(listFuture.get()); 
        System.out.println("spendTime = " + (System.currentTimeMillis() - t1)); 
    } 
    private static List<Integer> getList() { 
        try { 
            Thread.sleep(3_000L); 
            return Lists.newArrayList(1,2,3,4,5); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
            return Lists.newArrayList(); 
        } 
    }
    View Code

    对两个 CompletableFuture 的整合

    compose 与 combine

    compose,在一个 CompletableFuture 执行完毕后,将执行结果通过 Function 传递给下一个 Future 进行处理。

    combine,将两个 CompletableFuture 整合起来,无论它们是否存在依赖。它接受 BiFunction 第二参数,这个参数定义了当两个 CompletableFuture 对象执行完计算后,结果如何合并。

    两者都提供了后缀为 Async 的版本,该方法会将后续的任务提交到一个线程池中。其中 composeAsync 其实意义不大,因为 compose 操作的时间取决于第一个 CompletableFuture 的执行时间,composeAsync 相较 compose 消耗更多的线程切换开销。

    一点实际应用

    在服务化的项目中,一个服务调用另一个服务所提供的批量接口,如果一次调用的量过大那么将耗费很长时间,通过 CompletableFuture 可以暂缓一些时间,用作做执行别的任务

    更加优化的做法是将大批量请求分成若干个合理大小的小批量请求(每个服务一般都是多机部署的,这样多个请求通过负载均衡打到多台机器,达到了并行运算的效果),还是通过 CompletableFuture 的方式,最终将结果进行组合,组合的过程就可以用 combine 来进行,而不是先 get 再 addAll 这种 low 的做法。

    CompletableFuture<List<Integer>> f1 = CompletableFuture.supplyAsync(OrderThriftServiceTest::getList);
    CompletableFuture<List<Integer>> f2 = CompletableFuture.supplyAsync(OrderThriftServiceTest::getList);
    CompletableFuture<List<Integer>> f3 = f1.thenCombineAsync(f2, (l1, l2) -> Stream.concat(l1.stream(), l2.stream()).collect(toList()));
    // doSomething...
    f3.get();

    参考资料

    [1] Java8 实战. 第 11 章

    [2] Java 高并发程序设计. 6.5

  • 相关阅读:
    PostgreSQL管理工具:pgAdminIII
    PostgresQL7.5(开发版)安装与配置(win2003测试通过)
    让PosggreSQL运行得更好
    在.NET程序中使用PIPE(管道技术)
    在浏览网页过程中,单击超级链接无任何反应
    字符串转换
    数组初始化
    使用现有的COM
    后台服务程序开发模式(一)
    COM的四本好书
  • 原文地址:https://www.cnblogs.com/zhengbin/p/9000073.html
Copyright © 2011-2022 走看看