JDK中CompletableFuture类
一、前言
CompletableFuture是一个可以通过显示的设置结果和状态以便让任务结束的Future,并且可以作为一个CompletionStage(计算阶段)当它的计算完成时候触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时候只有一个线程会成功;
CompletableFuture除了含有可以直接操作任务状态和结果的方法外,还实现了CompletionStage接口的一些方法,这些方法遵循:
- 当CompletableFuture任务完成后同步使用任务执行线程来执行依赖任务结果的函数或者行为。
- 所有异步的方法在没有显示指定Executor参数的情形下执行都是复用ForkJoinPool.commonPool()线程池来执行;
- 所有的CompletionStage方法的实现都是相互独立的,以便一个方法的行为不会因为重载了其他方法而受影响。
一个CompletableFuture任务可能有一些依赖其计算结果的行为方法,这些行为方法被收集到一个无锁基于CAS操作来链接起来的链表组成的栈中;当CompletableFuture的计算任务完成后,会自动弹出栈中的行为方法并执行,需要注意的是由于是栈结构,行为注册的顺序与行为执行的顺序是相反的。
二、案例介绍
2.1 显示设置CompletableFuture结果
CompletableFuture是一种可以显示的设置结果的future, 下面我们通过一个例子来演示下:
如上代码(1)创建一个CompletableFuture对象,代码(2)开启了一个线程并启动。
- 代码(3)调用future的get()方法企图获取future的结果,如果future的结果没有被设置,则调用线程会被阻塞。
- 代码(2)创建的线程内,代码(2.1)休眠3s模拟异步任务的执行,代码(2.2)则当先休眠3s 后,调用future的complete方法,设置future的结果,设置结果后,所有由于调用future的get()方法而被阻塞的线程会被激活,并返回设置的结果。
如上,这里使用CompletableFuture实现了通知等待模型,主线程调用future的get()方法等待future返回结果,一开始由于future结果没有被设置,所以主线程被阻塞挂起,然后等子线程1休眠3s,然后调用future的complete方法模拟主线程等待的条件完成,这时候主线程就会从get()方法返回。
2.2 基于CompletableFuture实现异步计算与结果转换
- 基于runAsync系列方法实现无返回值的异步计算,当你想异步执行一个任务,并且不需要任务的执行结果时候可以使用该方法,比如异步打日志,异步做消息通知等等:
如上代码1.1创建了一个异步任务,并马上返回一个future对象,其创建了一个异步任务执行,任务内首先休眠了2秒,然后打印了一行日志。
代码1.2则调用返回的future的get()方法企图等待future任务执行完毕,由于runAsync方法不会有返回值,所以当任务执行完毕后,设置future的结果为null,所以代码1.2等任务执行完毕后返回null。
需要注意的是默认情况下runAsync(Runnable runnable)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步的任务的,使用runAsync(Runnable runnable,Executor executor)方法允许我们使用自己制定的线程池来执行异步任务,如下代码:
- 基于supplyAsync系列方法实现有返回值的异步计算,当你想异步执行一个任务,并且需要任务的执行结果时候可以使用该方法,比如异步对原始数据进行加工,并需要获取到被加工后的结果等等:
如上代码2.1使用supplyAsync开启了一异步任务,执行后马上返回一个future对象;异步任务内线程休眠2s,然后返回了一个字符串结果,这个结果是会被设置到future内部;
代码2.2 则使用future的get()方法获取结果,一开始future结果并没有被设置,所以调用线程会被阻塞;等异步任务把结果设置到future后,调用线程就会从get()处返回异步任务执行的结果。
需要注意的是默认情况下supplyAsync(Supplier<U> supplier)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步的任务的,使用supplyAsync(Supplier<U> supplier,Executor executor)方法允许我们使用自己制定的线程池来执行异步任务,如下代码:
- 基于thenRun实现异步任务A 执行完毕后,激活异步任务B执行,需要注意的是这种方式被激活的异步任务B是拿不到任务A的执行结果的:
如上代码1.创建异步任务,并返回oneFuture对象,代码2.在oneFuture上调用thenRun方法添加异步执行事件,当oneFuture计算完成后回调该事件,并返回新twoFuture,另外在twoFuture上调用get()方法也会返回null,因为回调事件是没有返回值的。
默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenRunAsync(Runnable action,Executor executor) 来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调的执行将会不在同一个线程中执行。
- 基于thenAccept实现异步任务A 执行完毕后,激活异步任务B执行,需要注意的是这种方式被激活的异步任务B是可以拿到任务A的执行结果的:
如上代码1创建异步任务,并返回oneFuture,代码2在oneFuture上调用了thenAccept添加了一个任务,这个任务会在oneFuture对应的任务执行完毕后被激活执行,需要注意的这里可以在回调的方法accept(String t)的参数t中来获取oneFuture对应的任务的结果,另外需要注意的是由于accept(String t)方法没有返回值,所以在twoFuture上调用get()方法最终也会返回null。
默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenAcceptAsync(Consumer<? super T> action, Executor executor) 来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调的执行将会不在同一个线程中执行。
- 基于thenApply实现异步任务A 执行完毕后,激活异步任务B执行,需要注意的是这种方式被激活的异步任务B是可以拿到任务A的执行结果的:
如上代码1创建异步任务,并返回oneFuture,代码2在oneFuture上调用了thenApply添加了一个任务,这个任务会在oneFuture对应的任务执行完毕后被激活执行,需要注意的这里可以在回调的方法apply(String t)的参数t中来获取oneFuture对应的任务的结果,另外需要注意的是由于apply(String t)方法是有返回值,所以在twoFuture上调用get()方法最终也会返回回调方法返回的值。
默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调的执行将会不在同一个线程中执行。
2.3 多个CompletableFuture进行组合运算
CompletableFuture的功能强大之一是其可以让两个或者多个CompletableFuture进行运算来产生结果,下面我们来看其提供的几组函数:
- 基于thenCompose实现当一个CompletableFuture执行完毕后,执行另外一个CompletableFuture:
如上main函数中首先调用了方法doSomethingOne("123")开启了一个异步任务,并返回了对应的CompletableFuture对象,我们取名为future1,然后在future1的基础上调用了thenCompose方法,企图让future1执行完毕后,激活使用其结果作为
doSomethingTwo(String companyId)方法的参数创建的异步任务返回的CompletableFuture任务。
- 基于thenCombine实现当两个CompletableFuture任务都完成后,使用两者的结果作为参数在执行一个异步任务,实现这个只需要把上面例子中的:
修改为:
- 基于allOf等待多个CompletableFuture任务执行完毕:
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用allOf方法把多个CompletableFuture转换为了一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中所有任务执行完毕才返回。
- 基于anyOf等多个CompletableFuture任务中有一个执行完毕:
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用anyOf方法把多个CompletableFuture转换为了一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中有一个执行完毕才返回。
三、CompletableFuture类图结构介绍
image.png
如上类图CompletableFuture实现了CompletionStage接口:
1).一个CompletionStage是一个异步计算阶段,当另外一个CompletionStage计算完成后当前CompletionStage会执行或者计算一个值;一个阶段在计算终止时完成,但这可能反过来触发其他依赖的阶段开始计算;
2).一个阶段的计算执行可以被表述为一个函数、消费者、可执行的Runable(例如使用apply、accept、run方法),这取决于这个阶段是否需要参数或者产生结果。例如 stage.thenApply(x -> square(x)).thenAccept(x ->System.out.print(x)).thenRun(() -> System.out.println());
3).阶段的执行可以使用三种模式来执行:默认执行,默认异步执行(使用async后缀的方法),用户自定义的执行器执行(通过传递一个Executor方式);
4).一个阶段的执行可以通过一个或者两个阶段的执行完成来触发。一个阶段依赖的其他阶段通常使用then前缀的方法来进行组织;
类图中result字段用来存放任务执行的结果,如果不为null,标识任务已经执行完成,而计算任务本身也可能需要返回null值,所以使用AltResult来包装计算任务返回null的情况,并且里面存在了异常信息。
类图中asyncPool是用来执行异步任务的线程池,如果支持并发则默认为ForkJoinPool.commonPool(),否则是ThreadPerTaskExecutor。
四、原理剖析
4.1 CompletableFuture<Void> runAsync(Runnable runnable)方法
该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为在使用ForkJoinPool.commonPool()异步执行完毕后被设置为null,代码如下:
- 如上代码1判断行为是否为null,如果是则抛出异常
- 代码2创建一个CompletableFuture对象
- 代码3首先创建一个AsyncRun任务,里面保存了创建的future对象和要执行的行为,然后投递到ForkJoinPool.commonPool()线程池执行。
- 代码4直接返回创建的CompletableFuture对象。
可知runAsync会马上返回一个CompletableFuture对象,并且当前线程不会被阻塞;代码3投递AsyncRun任务到线程池后,线程池线程会执行其run方法,下面我们看AsyncRun中如何执行我们设置的行为,然后把结果设置到创建的future对象的:
如上代码5如果发现future的result不为null,则说明当前future还没被结束,则代码5.1执行我们传递的runnable方法,然后执行5.2设置future对象的结果为null,这时候其他调用future的get() 方法而被阻塞的线程就会从get()处返回null。
代码6当future任务结束后,看其stack栈里面是否有依赖其结果的行为,如果有则从栈中弹出来,并执行。
4.2 CompletableFuture<U> supplyAsync(Supplier<U> supplier)方法
该方法返回一个新的CompletableFuture对象,其结果值会在给定的runnable行为在使用ForkJoinPool.commonPool()异步执行完毕后被设置为入参supplier行为执行的结果,代码如下:
如上其代码与runAsync类似,不同在于其提交到线程池的是AsyncSupply类型的任务,我们看其代码:
如上代码与runAsync不同在于这里的行为方法是Supplier,其get()方法有返回值,其返回值会被设置到future中,然后调用future的get() 方法的线程就会获取到该值。