zoukankan      html  css  js  c++  java
  • java 线程相关(4)

    1、 CompletableFuture

    • 定义

    java8新增对Future的补充,CompletableFuture支持流式计算、函数式编程等新特性,通过CompletableFuture,我们可以实现非阻塞的Future结果调用。

    CompletableFuture实现了Future和CompletionStage两个接口,其中CompletionStage抽象了一些异步编程的补充方法。

    • Future和CompletableFuture的区别

    (1) Future不支持手动完成Future任务,只能等待Future执行完成,而CompletableFuture提供complete方法,可以主动完成任务。

    (2) Future不支持链式调用,CompletableFuture提供Completion stack的链式结构实现链式调用。

    (3) Future的api中不支持异常处理,CompletableFuture提供异常处理exceptionally方法。

    2、CompletableFuture的一些基本用法

    注:下面CompletableFuture统一用任务来称呼

    2.1 创建一个CompletableFuture

    supplyAsync方法:
    创建一个有返回值的CompletableFuture,参数为Supperlier接口,返回值主要是重写Supplier接口的get方法。

      CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                    @Override
                    public Long get() {
                        long result = new Random().nextInt(100);
                        System.out.println(format(String.valueOf(result)));
                        return result;
                    }
                });
    

    runAsync方法:
    创建一个没有返回值的CompletableFuture,参数为Runnable。

                CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(new Random().nextInt(5));
                    }
                });
    

    2.2 实现一个CompletableFuture的回调函数

    thenApply方法:
    接收上一个任务的返回值为入参,并返回一个新的任务(带返回值)。

            private static void thenApply() throws Exception {
                CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                    @Override
                    public Long get() {
                        long result = new Random().nextInt(100);
                        //69
                        System.out.println(format(String.valueOf(result)));
                        return result;
                    }
                }).thenApply(new Function<Long, Long>() {
                    @Override
                    public Long apply(Long t) {
                        long result = t * 5;
                        // 345
                        System.out.println(format(String.valueOf(result)));
                        return result;
                    }
                });
    
                long result = future.get();
                // result = 345, 说明链式调用取最后的结果
                System.out.println("main = " + result);
            }
    

    thenAccept方法:接收上一个任务的返回值为入参,并返回一个新的任务(不带返回值)。

    thenRun方法:没有入参,并返回一个新的任务(不带返回值)。

    2.3 实现多个CompletableFuture的组合回调

    thenCompose方法:两个有依赖关系的CompletableFuture,第一个执行后才执行第二个。

        void testThenCompose() throws ExecutionException, InterruptedException {
            // thenCompose合并两个有依赖关系的CompletableFutures的执行结果,第一个任务结果传给下一个future
    
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 2 / 1);
    
            //future1执行完的结果传给future2执行
            CompletableFuture<String> future2 = future1.thenCompose((result) ->
                    CompletableFuture.supplyAsync(() -> String.valueOf(result * 2));
            );
    
            System.out.println(future2.get());
        }
    

    thenCombine方法:两个任务组合,两个任务正常执行后执行thenCombine,两个任务的结果做参数。

            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
    
            CompletableFuture<Integer> future3 = future1.thenCombine(future2, (result1, result2) -> {
                System.out.println("result1 : " + result1);
                System.out.println("result2 : " + result2);
    
                return result1 + result2;
            });
    

    applyToEither方法:两个任务组合,只要有一个任务执行完成后就执行。
    allOf方法:多个CompletableFuture全部执行完后,再执行回调。
    anyOf方法:多个CompletableFuture有一个执行完就执行调用。

                CompletableFuture allOfFuture = CompletableFuture.allOf(future1, future2, future3).handle((result, e) -> {
                    if (e != null) {
                        System.out.println("error : " + e.getMessage());
                    }
                    System.out.println("anyOf : " + result);
                    return result;
                });
    
                CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3).whenComplete((o, throwable) -> {
                    System.out.println("anyOf : " + o.toString());
                });
    

    2.4 异常处理

    whenComplete方法:
    当某个任务执行完成后,传入执行结果(无论是否有异常)的回调方法,参数为(result,throwable)

    exceptionally方法:
    出现异常,会调用这个回调方法,参数为 (throwable)

    handle方法:
    异常捕捉方法,无论发生异常都会执行,参数为(result,throwable)

    3、CompletableFuture中带Async方法和不带Async方法的区别

    不带Async结尾的方法是同步方法,运行时使用调用这个方法的线程执行。

    带Async结尾的方法是异步方法,执行时会提交给线程池(默认ForkJoinPool,可以自定义)来执行,提高任务的并行度。

    示例:

     Long start = System.currentTimeMillis();
                CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
                    Integer i = new Random().nextInt(100);
                    System.out.println(Thread.currentThread().getName() + " : future1 = " + i);
                    return i;
                }).thenApply(result -> {
    
                    System.out.println(Thread.currentThread().getName() + " : future1 = " + result*2);
                    return result*2;
                });
                System.out.println("future1 cast = "+(System.currentTimeMillis()-start));
    
                start = System.currentTimeMillis();
                CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
                    Integer i = new Random().nextInt(100);
                    System.out.println(Thread.currentThread().getName() + " : future2 = " + i);
                    return i;
                }).thenApplyAsync(result -> {
                    System.out.println(Thread.currentThread().getName() + " : future2 = " + result*2);
                    return result*2;
                });
                System.out.println("future2 cast = "+(System.currentTimeMillis()-start));
    
                Thread.sleep(2000);
    

    上述例子输出:可以看出future1的thenApply交由main线程执行,future2的thenApplyAsync由ForkJoinPool执行

    ForkJoinPool.commonPool-worker-1 : future1 = 40
    main : future1 = 80
    future1 cast = 68
    ForkJoinPool.commonPool-worker-1 : future2 = 83
    future2 cast = 0
    ForkJoinPool.commonPool-worker-1 : future2 = 166
    

    如果在future1的supplyAsync加上Thread.sleep(1000)等等待操作,future1会由ForkJoinPool调用

    4、CompletableFuture源码解析

    分析CompletableFuture源码可以从它的内部内Completion开始分析:

    Completion继承了ForkJoinTask,实现了Runnable接口,每个Completion可以看做CompletableFuture的一个阶段任务,Compltion有一个next指针,指向的是下一个Completion,多个CompletableFuture的回调实现就是通过Completion的next指针指向下一个Completion来实现的。
    Completion源码:

        abstract static class Completion extends ForkJoinTask<Void> implements Runnable, CompletableFuture.AsynchronousCompletionTask {
            volatile CompletableFuture.Completion next; \指向下一个Completion
    
            Completion() {
            }
    
            abstract CompletableFuture<?> tryFire(int var1); \尝试执行这个Completion任务并唤醒后续的Completion
    
            abstract boolean isLive();
    
            public final void run() {
                this.tryFire(1);
            }
    
            public final boolean exec() {
                this.tryFire(1);
                return false;
            }
    ......
        }
    

    UniCompletion和BiCompletion都是Completion的子类,他们比较Completion,新增加了一些属性:
    1、Executor executor;执行线程池
    2、CompletableFuture dep;依赖src的回调的CompletableFuture任务
    3、CompletableFuture src;源CompletableFuture任务

    CompletableFuture的多种api实现都是依赖于UniCompletion和BiCompletion派生出来的子类实现的,例如简单的api的thenAccpect(),其实就封装了一个内部类UniAccept

    UniAccept源码:

     static final class UniAccept<T> extends CompletableFuture.UniCompletion<T, Void> {
            Consumer<? super T> fn;\传入的任务,函数式接口
    
            UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
                super(executor, dep, src);
                this.fn = fn;
            }
    
            final CompletableFuture<Void> tryFire(int mode) {
                CompletableFuture d;
                CompletableFuture a;
                Object r;
                Consumer f;
                if ((d = this.dep) != null && (f = this.fn) != null && (a = this.src) != null && (r = a.result) != null) {
                    if (d.result == null) {
                        label33: {
                            if (r instanceof CompletableFuture.AltResult) {
                                Throwable x;
                                if ((x = ((CompletableFuture.AltResult)r).ex) != null) {
                                    d.completeThrowable(x, r);
                                    break label33;
                                }
    
                                r = null;
                            }
    
                            try {
                                if (mode <= 0 && !this.claim()) {
                                    return null;
                                }
    
                                f.accept(r);//这里其实就执行了我们传入的Consumer接口定义的异步任务
                                d.completeNull();
                            } catch (Throwable var8) {
                                d.completeThrowable(var8);
                            }
                        }
                    }
    
                    this.dep = null;
                    this.src = null;
                    this.fn = null;
                    return d.postFire(a, mode); \ 通知这个Completion上的其他Completion,next指针(具体实现在postFire方法里)
                } else {
                    return null;
                }
            }
        }
    

    源码分析只是简单写了一些源码的实现方式,它的Completion的链路可以看成一个栈的数据结构。

    关于学习到的一些记录与知识总结
  • 相关阅读:
    [LeetCode] Maximum Depth of Binary Tree
    [LeetCode] Binary Tree Level Order Traversal II
    阿里第一天——maven学习
    微博用户行为分析
    对节目微博进行强过滤之后的处理
    关于推荐和机器学习的几个网站
    大论文微博个性化
    新浪微博用户分析
    位运算符规律小结
    字符串类常见面试大题
  • 原文地址:https://www.cnblogs.com/Zxq-zn/p/14838863.html
Copyright © 2011-2022 走看看