  • Future、 CompletableFuture、ThreadPoolTaskExecutor简单实践

    一 Future(jdk5引入)

         简介: Future接口是Java多线程Future模式的实现,可以来进行异步计算。


       简单测试 -  主题 : Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务,期间程序可以处理其他任务。


    public class ThreadPoolTest {
        private static  Logger logger= LoggerFactory.getLogger(ThreadPoolTest.class);
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newSingleThreadExecutor();//创建单一线程
            Future<String> future = threadPool.submit(new Callable<String>() {
                public String call() throws Exception {
                    return "Hello world";
            try {
                try {
                    BigDecimal bigDecimal=new BigDecimal(0);
                    logger.info(future.get(3000,TimeUnit.MILLISECONDS));//Hello world
                } catch (TimeoutException e) {
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            }finally {
    11:17:17.558 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out1:0
    Disconnected from the target VM, address: '', transport: 'socket'
    11:17:19.556 [pool-1-thread-1] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out2-----
    11:17:19.557 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - Hello world
    SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
    10:44:07.413 [main] ERROR tk.mybatis.springboot.util.thread.ThreadPoolTest - timeout-exception
    java.util.concurrent.TimeoutException: null
        at java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at tk.mybatis.springboot.util.thread.ThreadPoolTest.main(ThreadPoolTest.java:30)
    Process finished with exit code 0

    总结:1  比起future.get(),推荐使用get (long timeout, TimeUnit unit) 方法,


             2  虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,





    package java.util.concurrent;
     * A {@code Future} represents the result of an asynchronous
     * computation.  Methods are provided to check if the computation is
     * complete, to wait for its completion, and to retrieve the result of
     * the computation.  The result can only be retrieved using method
     * {@code get} when the computation has completed, blocking if
     * necessary until it is ready.  Cancellation is performed by the
     * {@code cancel} method.  Additional methods are provided to
     * determine if the task completed normally or was cancelled. Once a
     * computation has completed, the computation cannot be cancelled.
     * If you would like to use a {@code Future} for the sake
     * of cancellability but not provide a usable result, you can
     * declare types of the form {@code Future<?>} and
     * return {@code null} as a result of the underlying task.
     * <p>
     * <b>Sample Usage</b> (Note that the following classes are all
     * made-up.)
     * <pre> {@code
     * interface ArchiveSearcher { String search(String target); }
     * class App {
     *   ExecutorService executor = ...
     *   ArchiveSearcher searcher = ...
     *   void showSearch(final String target)
     *       throws InterruptedException {
     *     Future<String> future
     *       = executor.submit(new Callable<String>() {
     *         public String call() {
     *             return searcher.search(target);
     *         }});
     *     displayOtherThings(); // do other things while searching
     *     try {
     *       displayText(future.get()); // use future
     *     } catch (ExecutionException ex) { cleanup(); return; }
     *   }
     * }}</pre>
     * The {@link FutureTask} class is an implementation of {@code Future} that
     * implements {@code Runnable}, and so may be executed by an {@code Executor}.
     * For example, the above construction with {@code submit} could be replaced by:
     *  <pre> {@code
     * FutureTask<String> future =
     *   new FutureTask<String>(new Callable<String>() {
     *     public String call() {
     *       return searcher.search(target);
     *   }});
     * executor.execute(future);}</pre>
     * <p>Memory consistency effects: Actions taken by the asynchronous computation
     * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
     * actions following the corresponding {@code Future.get()} in another thread.
     * @see FutureTask
     * @see Executor
     * @since 1.5
     * @author Doug Lea
     * @param <V> The result type returned by this Future's {@code get} method
    public interface Future<V> {
         * Attempts to cancel execution of this task.  This attempt will
         * fail if the task has already completed, has already been cancelled,
         * or could not be cancelled for some other reason. If successful,
         * and this task has not started when {@code cancel} is called,
         * this task should never run.  If the task has already started,
         * then the {@code mayInterruptIfRunning} parameter determines
         * whether the thread executing this task should be interrupted in
         * an attempt to stop the task.
         * <p>After this method returns, subsequent calls to {@link #isDone} will
         * always return {@code true}.  Subsequent calls to {@link #isCancelled}
         * will always return {@code true} if this method returned {@code true}.
         * @param mayInterruptIfRunning {@code true} if the thread executing this
         * task should be interrupted; otherwise, in-progress tasks are allowed
         * to complete
         * @return {@code false} if the task could not be cancelled,
         * typically because it has already completed normally;
         * {@code true} otherwise
        boolean cancel(boolean mayInterruptIfRunning);
         * Returns {@code true} if this task was cancelled before it completed
         * normally.
         * @return {@code true} if this task was cancelled before it completed
        boolean isCancelled();
         * Returns {@code true} if this task completed.
         * Completion may be due to normal termination, an exception, or
         * cancellation -- in all of these cases, this method will return
         * {@code true}.
         * @return {@code true} if this task completed
        boolean isDone();
         * Waits if necessary for the computation to complete, and then
         * retrieves its result.
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
        V get() throws InterruptedException, ExecutionException;
         * Waits if necessary for at most the given time for the computation
         * to complete, and then retrieves its result, if available.
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         * @throws TimeoutException if the wait timed out
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    二. CompletableFuture介绍

    2.1 Future模式的缺点

    • Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。

    • 要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。

    2.2 CompletableFuture介绍

     public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

         Netty、Guava分别扩展了Java 的 Future 接口,方便异步编程。

        在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,



    1 )主动完成计算
    public T     get()
    public T     get(long timeout, TimeUnit unit)
    public T     getNow(T valueIfAbsent) //如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent
    public T join()


        public void test(){
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
               int i=1/0;
               return 100;
            logger.info(String.format("join=%d", future.join()));
            try {
                logger.info(String.format("get=%s", future.get()));
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {


    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    public class BasicMain {
        private static Logger logger = LoggerFactory.getLogger(BasicMain.class);
    public static CompletableFuture<Integer> compute() { final CompletableFuture<Integer> future = new CompletableFuture<>(); return future; } public static void main(String[] args) throws Exception { final CompletableFuture<Integer> f = compute(); class Client extends Thread { CompletableFuture<Integer> f; Client(String threadName, CompletableFuture<Integer> f) { super(threadName); this.f = f; } @Override public void run() { try { logger.info(this.getName() + ": " + f.get()); } catch (InterruptedException e) { logger.error(e.getMessage()); } catch (ExecutionException e) { logger.error(e.getMessage()); } } } new Client("Client1", f).start(); new Client("Client2", f).start(); logger.info("waiting...."); f.complete(100); f.obtrudeValue(200);
    //logger Client1: 200 Client2: 200 或者 Client1: 100 Client1: 100 或者
    Client1: 200 Client2: 100 ,都有可能

    // f.completeExceptionally(new Exception());


             可以看到我们并没有把f.complete(100) 放在另外的线程中去执行,但是在大部分情况下我们可能会用一个线程池去执行这些异步任 务。CompletableFuture.complete()CompletableFuture.completeExceptionally只能被调用一次。





    runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
    runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码。
    supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
    supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值

        eg: runAsync 和 supplyAsync 方法比较







     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "Hello";
            try {
               logger.info("out = "+ future.get());//out = Hello
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
          CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
               logger.info("out = "+ future.get());//out = null
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
    简单试验 - 主题:在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C

    package tk.mybatis.springboot.util.thread;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.time.LocalTime;
    import java.util.Random;
    import java.util.concurrent.*;
     * 在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C
     * 两种方法useFuture和useCompletableFuture相比:
     * 首先,useCompletableFuture 比 useFuture 的代码简单。
     *  在useFuture 里,既要自己照顾线程池的创建和销毁,还要负责对任务A和任务B的监控。
     *  而useCompletableFuture,只需要用CompletableFuture的runAfterEither就完成了任务A、任务B和任务C之间的依赖关系的定义。
    public class CompletableFutureTest {
        private static Logger logger = LoggerFactory.getLogger(CompletableFutureTest.class);
        private static Random random = new Random();
         * useFuture test
         * @throws InterruptedException
         * @throws ExecutionException
        private static void useFuture() throws InterruptedException, ExecutionException {
            ExecutorService exector = Executors.newFixedThreadPool(3);
            Future<Void> futureA = exector.submit(() -> work("A1"));
            Future<Void> futureB = exector.submit(() -> work("B1"));
            while (true) {
                try {
                    futureA.get(1, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                try {
                    futureB.get(1, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
            exector.submit(() -> work("C1")).get();
        private static void useCompletableFuture() throws InterruptedException, ExecutionException {
            CompletableFuture<Void> futureA = CompletableFuture.runAsync(() -> work("A2"));
            CompletableFuture<Void> futureB = CompletableFuture.runAsync(() -> work("B2"));
            logger.info("get="+futureA.runAfterEither(futureB, () -> work("C2")).get());
    //     或者
    //        CompletableFuture.runAsync(() -> work("A2"))
    //                .runAfterEither(CompletableFuture.runAsync(() -> work("B2"))
    //                        , () -> work("C2"))
    //                .get();
         * logger 输出
         * @param name
         * @return
        public static Void work(String name) {
            logger.info(name + " starts at " + LocalTime.now());
            try {
            } catch (InterruptedException e) {
            logger.info(name + " ends at " + LocalTime.now());
            return null;
         * 从useFuture的输出可以看出,
         *  任务C1的开始并不是紧随着任务A1的完成,差了0.001秒,
         *  原因是在方法1里,是对任务A1和任务B1都用get(1,TimeUnit.SECONDS)来询问他们的状态,
         *  当其中一个任务先完成时,主线程可能正阻塞在另一个未完成任务的get上
         * 而从useCompletableFuture完全不存在这样的问题,
         *      任务C2的开始于任务A1的结束之间没有任何的时间差
         * @param args
         * @throws InterruptedException
         * @throws ExecutionException
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    //        useFuture();
            //logger 输出
            // B1 starts at 10:28:09.562
    //         A1 starts at 10:28:09.562
    //         B1 ends at 10:28:17.566
    //         C1 starts at 10:28:17.567
    //         A1 ends at 10:28:18.563
    //         C1 ends at 10:28:20.570
    //        TimeUnit.SECONDS.sleep(10);
            //logger 输出
    //        A2 starts at 10:43:46.867
    //        B2 starts at 10:43:46.867
    //        B2 ends at 10:43:51.871
    //        A2 ends at 10:43:51.871
    //        C2 starts at 10:43:51.871
    //        C2 ends at 10:44:00.874
            TimeUnit.SECONDS.sleep(10);//避免打印出的start - end 不全
       public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(null, action);
        public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(asyncPool, action);
        public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
            return uniWhenCompleteStage(screenExecutor(executor), action);
       public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
            return uniExceptionallyStage(fn);

           可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。



    package tk.mybatis.springboot.util.thread;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    public class MainTest {
        private static Logger logger = LoggerFactory.getLogger(MainTest.class);
        private static Random random = new Random();
        private static long t = System.currentTimeMillis();
        static Map<String,String> getMoreData() {
            try {
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            logger.info("end ");
            Map map=new HashMap();
            return map;
        public static void main(String[] args) throws Exception {
            CompletableFuture<Map<String,String>> future = CompletableFuture.supplyAsync(MainTest::getMoreData);
            Future<Map<String,String>> f = future.whenComplete((v, e) -> {






            CompletableFuture<Map<String,String>> futureException = future.exceptionally(new Function<Throwable, Map<String, String>>() {
                public Map<String, String> apply(Throwable throwable) {
                    Map map=new HashMap();
                    return map;




    public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)





    三 ThreadPoolTaskExecutor


