Future
Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。要使用Future,通只需要将耗时操作封装在一个Callable对象中,再将它提交给ExecutorService。
ExecutorService(线程池)体系结构:
一、线程池: 提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。 二、线程池的体系结构: java.util.concurrent.Executor 负责线程的使用和调度的根接口 |--ExecutorService 子接口: 线程池的主要接口 |--ThreadPoolExecutor 线程池的实现类 |--ScheduledExceutorService 子接口: 负责线程的调度 |--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService 三、工具类 : Executors ExecutorService newFixedThreadPool() : 创建固定大小的线程池 ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。 ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程 ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务
示例代码:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package completableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureTest { public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(1); Future<Integer> f = es.submit(() -> { Thread.sleep(10000); // 结果 return 100; }); // do something Integer result = 0; try { result = f.get(1l, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(result); } }
Future接口的局限性:Future很难直接表述多个Future 结果之间的依赖性
CompletableFuture
创建CompletableFuture
a.实例化生成 CompletableFuture
此时这个future和Callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。通过complete方法触发完成
示例代码如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package completableFuture; import java.util.concurrent.CompletableFuture; public class CompletableFutureTest { public static void main(String[] args) { Long start = System.currentTimeMillis(); CompletableFuture<Double> future = new CompletableFuture<Double>(); new Thread(() -> { try { Thread.sleep(1000l); double price = 10d; future.complete(price); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }).start(); Double result = future.join(); Long end = System.currentTimeMillis(); System.out.println((end - start) + " ms:" + result); } }
b.使用工厂方法创建 CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
supplyAsync
与runAsync
不同在与前者异步返回一个结果,后者是void. Executor表示是用自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
作为它的线程池.线程数是Runtime.getRuntime().availableProcessors(). 其中Supplier
是一个函数式接口(函数描述符为 () -> T). 结果执行完成时的处理
public CompletableFuture<T> whenComplete( <? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
BiConsumer的函数描述符为(T,U) -> void ,无返回值.方法以Async结尾,意味着Action使用相同的线程执行(如果使用相同的线程池,也可能会被同一个线程选中执行).
a.结果转换(thenApply):输入是上一个阶段计算后的结果,返回值是经过转化后结果 .Function函数描述符: T -> R
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
b.消费结果(thenAccept):只是针对结果进行消费,入参是Consumer,没有返回值. Consumer函数描述符: T -> void
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
c.结果合并(thenCombine):需要上一阶段的返回值,并且other代表的CompletionStage也要返回值之后,把这两个返回值,进行转换后返回指定类型的值
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
代码示例如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package completableFuture; import java.util.concurrent.CompletableFuture; public class Test { public static void main(String[] args) { testThenApply(); testThenCompose(); testThenCombine(); testThenAcceptBoth(); } private static void testThenApply() { // 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值 CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> { System.out.println(resultA + " resultB"); return resultA + " resultB"; }); } private static void testThenCompose() { // CompletableFuture.supplyAsync(() -> "resultA").thenCompose(resultA -> { return CompletableFuture.supplyAsync(() -> resultA + "resultB"); }); } private static void testThenCombine() { // thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B"); CompletableFuture<String> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2); System.out.println(result.join()); } private static void testThenAcceptBoth() { // 接受任务的处理结果,并消费处理,有返回结果 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B"); future1.thenAcceptBoth(future2, (f1, f2) -> { System.out.println("accept:" + f1 + f2); }); } }