zoukankan      html  css  js  c++  java
  • 强大的CompletableFuture

    www.wityx.com

     

    引子

    为了让程序更加高效,让CPU最大效率的工作,我们会采用异步编程。首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎Future粉墨登场。然而Future提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。所以,为了满足Future的某些遗憾,强大的CompletableFuture随着Java8一起来了。

    Future

    传统多线程的却让程序更加高效,毕竟是异步,可以让CPU充分工作,但这仅限于新开的线程无需你的主线程再费心了。比如你开启的新线程仅仅是为了计算1+...+n再打印结果。有时候你需要子线程返回计算结果,在主线程中进行进一步计算,就需要Future了。

    看下面这个例子,主线程计算2+4+6+8+10;子线程计算1+3+5+7+9;最后需要在主线程中将两部分结果再相加。

    public class OddNumber implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
    Thread.sleep(3000);
    int result = 1 + 3 + 5 + 7 + 9;
    return result;
    }
    }
    public class FutureTest {
    public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    OddNumber oddNumber = new OddNumber();
    Future<Integer> future = executor.submit(oddNumber);
    long startTime = System.currentTimeMillis();
    int evenNumber = 2 + 4 + 6 + 8 + 10;
    try {
    Thread.sleep(1000);
    System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    int oddNumberResult = future.get();//这时间会被阻塞
    System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
    System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    } catch (Exception e) {
    System.out.println(e);
    }
    }
    }
    输出结果:
    0.开始了:1001秒
    1+2+...+9+10=55
    1.开始了:3002秒

    看一下Future接口,只有五个方法比较简单

    //取消任务,如果已经完成或者已经取消,就返回失败
    boolean cancel(boolean mayInterruptIfRunning);
    //查看任务是否取消
    boolean isCancelled();
    //查看任务是否完成
    boolean isDone();
    //刚才用到了,查看结果,任务未完成就一直阻塞
    V get() throws InterruptedException, ExecutionException;
    //同上,但是加了一个过期时间,防止长时间阻塞,主线程也做不了事情
    V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

    CompletableFuture

    上面的看到Future的五个方法,不是很丰富,既然我们的主线程叫做main,就应该以我为主,我更希望子线程做完了事情主动通知我。为此,Java8带来了CompletableFuture,一个Future的实现类。其实CompletableFuture最迷人的地方并不是极大丰富了Future的功能,而是完美结合了Java8流的新特性。

    实现回调,自动后续操作

    提前说一下CompletableFuture实现回调的方法(之一):thenAccept()

        public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
    }

    参数有个Consumer,用到了Java8新特性,行为参数化,就是参数不一定是基本类型或者类,也可以是函数(行为),或者说一个方法(接口)。

    public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 1+3+5+7+9;
    }
    }
    public class CompletableFutureTest {
    public static void main(String[] args) {
    long startTime = System.currentTimeMillis();
    final int evenNumber = 2 + 4 + 6 + 8 + 10;
    CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
    try {
    Thread.sleep(1000);
    System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    //看这里,实现回调
    oddNumber.thenAccept(oddNumberResult->
    {
    System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    System.out.println("此时计算结果为:"+(evenNumber+oddNumberResult));
    });
    oddNumber.get();
    } catch (Exception e) {
    System.out.println(e);
    }
    }
    }
    输出结果:
    0.开始了:1006秒
    1.开始了:3006秒
    此时计算结果为:55

    值得一提的是,本例中并没有显示的创建任务连接池,程序会默认选择一个任务连接池ForkJoinPool.commonPool()

        private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    ForkJoinPool始自JDK7,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法。极大的提高效率。当然,你也可以自己指定连接池。

    CompletableFuture合并

    Java8的确丰富了Future实现,CompletableFuture有很多方法可供大家使用,但是但从上面的例子来看,其实CompletableFuture能做的功能,貌似Future。毕竟你CompletableFuture用get()这个方法的时候还不是阻塞了,我Future蛮可以自己拿到返回值,再手动执行一些操作嘛(虽说这样main方法一定很不爽)。那么接下来的事情,Future做起来就十分麻烦了。假设我们main方法只做奇数合集加上偶数合集这一个操作,提前算这两个合集的操作异步交给两个子线程,我们需要怎么做呢?没错,开启两个线程,等到两个线程都计算结束的时候,我们进行最后的相加,问题在于,你怎么知道那个子线程最后结束的呢?(貌似可以做个轮询,不定的调用isDone()这个方法...)丰富的CompletableFuture功能为我们提供了一个方法,用于等待两个子线程都结束了,再进行相加操作:

        //asyncPool就是上面提到的默认线程池ForkJoinPool
    public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(asyncPool, other, fn);
    }

    看个例子:

    public class OddCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 1+3+5+7+9;
    }
    }
    public class EvenCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 2+4+6+8+10;
    }
    }

    public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
    CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
    CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
    long startTime = System.currentTimeMillis();
    CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
    return odd + even;
    });
    System.out.println(resultFuturn.get());
    System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
    }
    输出结果:
    55
    0.开始了:3000秒

    这边模拟一个睡1秒,一个睡3秒,但是真正的网络请求时间是不定的。是不是很爽,最爽的还不是现象,而是以上操作已经利用了Java8流的概念。

    两个子线程还不够,那么还有**anyOff()**函数,可以承受多个CompletableFuture,会等待所有任务都完成。

        public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
    }

    与它长的很像的,有个方法,是当第一个执行结束的时候,就结束,后面任务不再等了,可以看作充分条件。

        public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
    }

    在上面那个例子的基础上,把OddNumberPlus类时间调长一点:

    public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return 1+3+5+7+9;
    }
    }
    public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
    CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
    CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
    CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
    long startTime = System.currentTimeMillis();
    CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
    System.out.println(resultFuturn.get());
    System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
    }
    输出结果:
    30
    0.开始了:1000秒

    小结

    CompletableFuture的方法其实还有很多,常用的比如说runAsync(),类似于supplyAsync(),只是没有返回值;除了thenApply()可以加回调函数以外,还有thenApply();还有注入runAfterBoth()、runAfterEither(),这些见名知意。还有很多,可以点开CompletableFuture这个类的源码仔细看一看。见微知著,透过CompletableFuture,更加感觉到Java8的强大,强大的流概念、行为参数化、高效的并行理念等等,不仅让Java写起来更爽,还不断丰富Java整个生态。Java一直在进步,所以没有被时代淘汰,我们Javaer也可以继续职业生涯,感谢Java,一起进步。

    BLOG地址:www.liangsonghua.me

    公众号介绍:分享在京东工作的技术感悟,还有 JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的
  • 相关阅读:
    SuperMap房产测绘成果管理平台
    SuperMap产权登记管理平台
    Android adb shell am 的用法(1)
    由浅入深谈Perl中的排序
    Android 内存监测和分析工具
    Android 网络通信
    adb server is out of date. killing...
    引导页使用ViewPager遇到OutofMemoryError的解决方案
    adb logcat 详解
    How to send mail by java mail in Android uiautomator testing?
  • 原文地址:https://www.cnblogs.com/qq575654643/p/11830850.html
Copyright © 2011-2022 走看看