0.概述
服务端编程的一个经典场景是在接收和处理客户端请求时,为了避免对每一个请求都分配线程而带来的资源开销,服务一般会预先分配一个固定大小的线程池(比如Tomcat connector maxThreads),当客户端请求到来时,从线程池里寻找空闲状态的线程来处理请求,请求处理完毕后会回到线程池,继续服务下一个请求。当线程池内的线程都处于繁忙状态时,新来的请求需要排队直到线程池内有可用的线程,或者当超出队列容量后(Tomcat connector acceptCount属性)请求被拒绝(connection refused error)。
为了提高服务的吞吐量,我们应当确保主线程尽快处理尽快返回,尽量使服务端的任务处理线程池始终有可分配的线程来处理新的客户端请求。
当主线程执行一个任务时,如果该任务较耗时, 通常的做法是利用Future/Promise来异步化处理任务。从JDK1.5开始,J.U.C中提供了Future来代表一个异步操作。JDK1.8中则新增了lambda表达式和CompletableFuture, 可以帮助我们更好的用函数式编程风格来实现任务的异步处理。
1. Future
代码例子:
ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> future = executor.submit(() -> { // long running task return "task finish."; });
Future实在是太鸡肋了,仅暴露了get/cancel/isDone/isCancelled方法。我们无法通过Future去手动结束一个任务,也无法非阻塞的去获取Future的任务结果,因为future.get()方法是阻塞的。假设有下面这个场景,当前有两个任务,后一个任务依赖了前一个任务的处理结果,这种场景也无法通过Future来实现异步流程任务处理。
2. CompletableFuture
CompletableFuture实现了Future和CompletionStage两个接口,CompletionStage可以看做是一个异步任务执行过程的抽象。我们可以基于CompletableFuture方便的创建任务和链式处理多个任务。下面我们通过实例来介绍它的用法。
2.1 创建任务
可以使用runAsync方法新建一个线程来运行Runnable对象(无返回值)
CompletableFuture<Void> futureAsync = CompletableFuture.runAsync(() -> { // long running task without return value System.out.println("task finish."); });
也可以使用supplyAysnc方法新建线程来运行Supplier<T>对象(有返回值)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; });
这里执行任务的线程来自于ForkJoinPool.commonPool() , 也可以自定义线程池
ExecutorService exector = Executors.newFixedThreadPool(5); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }, executor);
2.2 任务的异步处理
不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。
2.2.1 使用callback基于特定任务的结果进行异步处理
程序中经常需要主线程创建新的线程来处理某一任务,然后基于任务的完成结果(返回值或者exception)来执行特定逻辑, 对于这种场景我们可以很方面的使用whenComplete或者whenCompleteAsync来注册callback方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }); future.whenComplete((result, exception) -> { if (null == exception) { System.out.println("result from previous task: " + result); } });
对于任务执行中抛错的情况:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task throw new RuntimeException("error!"); }); future.whenComplete((result, exception) -> { if (null == exception) { System.out.println("result from previous task: " + result); } else { System.err.println("Exception thrown from previous task: " + exception.getMessage()); } });
也可以用exceptionally来显示的处理错误:
CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException("error"); }).exceptionally(ex -> { System.out.println("Exception caught: " + ex.getMessage()); return ex.getMessage(); }).thenAccept(result -> { System.out.println("result: " + result); });
如果不需关心任务执行中是否有exception,则可以使用thenAccept方法, 需要注意的是如果执行中抛了exception, 则thenAccept里面的回调方法不会被执行
CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }).thenAccept((result) -> { System.out.println("result from previous task: " + result); });
2.2.2 任务的链式处理
在应用中经常会遇到任务的pipeline处理,任务A执行完后触发任务B,任务B执行完后触发任务C,上一个任务的结果是下一个任务的输入,对于这种场景,我们可以使用thenApply方法。
CompletableFuture.supplyAsync(() -> { // long running task return "task1"; }).thenApply(previousResult -> { return previousResult + " task2"; }).thenApply(previousResult -> { return previousResult + " task3"; }).thenAccept(previousResult -> { System.out.println(previousResult); }); output: task1 task2 task3
让我们再看下面这个例子,某一应用需要先根据accountId从数据库找到对应的账号信息,然后对该账号执行特定的处理逻辑:
CompletableFuture<Account> getAccount(String accountId) { return CompletableFuture.supplyAsync(() -> { return accountService.findAccount(accountId); }); } CompletableFuture<String> processAccount(Account account) { return CompletableFuture.supplyAsync(() -> { return accountService.updateAccount(account); }); }
如果使用thenApply方法,其返回的结果是一个嵌套的CompletableFuture对象:
CompletableFuture<CompletableFuture<String>> res = getAccount("123").thenApply(account -> { return processAccount(account); });
如果不希望结果是嵌套的CompletableFuture,我们可以使用thenCompose方法来替代thenApply
CompletableFuture<String> res = getAccount("123").thenCompose(account -> { return processAccount(account); });
2.2.3 多任务的并行处理
另一种常见的场景是将一个大的任务切分为数个子任务,并行处理所有子任务,当所有子任务都成功结束时再继续处理后面的逻辑。以前的做法是利用CountDownLatch, 主线程构造countDownLatch对象,latch的大小为子任务的总数,每一个任务持有countDownLatch的引用,任务完成时对latch减1,主线程阻塞在countDownLatch.await方法上,当所有子任务都成功执行完后,latch=0, 主线程继续执行。
int size = 5; CountDownLatch latch = new CountDownLatch(size); for (int i = 0; i < size; i++) { Executors.newFixedThreadPool(size).submit(() -> { try { // long running task System.out.println(Thread.currentThread().getName() + " " + latch.getCount()); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // continue... System.out.println(Thread.currentThread().getName());
这样的代码繁琐且很容易出错,我们可以用CompletableFuture.allOf来方便的处理上述场景。直接贴例子, 根据一组账户ID并行查找对应账户:
CompletableFuture<String> findAccount(String accountId) { return CompletableFuture.supplyAsync(() -> { // mock finding account from database return "account" + accountId; }); } public void batchProcess(List<String> accountIdList) { // 并行根据accountId查找对应account List<CompletableFuture<String>> accountFindingFutureList = accountIdList.stream().map(accountId -> findAccount(accountId)).collect(Collectors.toList()); // 使用allOf方法来表示所有的并行任务 CompletableFuture<Void> allFutures = CompletableFuture .allOf(accountFindingFutureList.toArray(new CompletableFuture[accountFindingFutureList.size()])); // 下面的方法可以帮助我们获得所有子任务的处理结果 CompletableFuture<List<String>> finalResults = allFutures.thenApply(v -> { return accountFindingFutureList.stream().map(accountFindingFuture -> accountFindingFuture.join()) .collect(Collectors.toList()); }); }
如果后续逻辑没有必要等待所有子任务全部结束,而是只要任一一个任务成功结束就可以继续执行,我们可以使用CompletableFuture.anyOf方法:
CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(taskFutureA, taskFutureB, taskFutureC);
假设三个任务中taskFutureA最先执行完毕并成功返回,则anyOfFutures里得到的是taskFutureA的执行结果.
3.展望
基于JDK1.8的lambda表达式和CompletableFuture, 我们可以写出更具有函数式编程风格的代码,可以更方便的实现任务的异步处理,只用很少的代码便可以实现任务的异步pipeline和并行调用。在异步开发模型(nodeJs/Vert.x)越来越火的今天,我们就从今天开始使用lambda+CompletableFuture来改造我们的Java应用吧。