zoukankan      html  css  js  c++  java
  • Future、 CompletableFuture、ThreadPoolTaskExecutor简单实践

    一 Future(jdk5引入)

         简介: Future接口是Java多线程Future模式的实现,可以来进行异步计算。

                  可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,也可以使用cancel方法停止任务的执行。

       简单测试 -  主题 : Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务,期间程序可以处理其他任务。

                                  一段时间之后,主线程可以从Future那儿取出结果

    public class ThreadPoolTest {
    
        private static  Logger logger= LoggerFactory.getLogger(ThreadPoolTest.class);
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newSingleThreadExecutor();//创建单一线程
            Future<String> future = threadPool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(2000);
                    logger.info("out2-----");
                    return "Hello world";
                }
            });
    
            try {
                try {
                    BigDecimal bigDecimal=new BigDecimal(0);
                    logger.info("out1:"+bigDecimal.toString());
                    logger.info(future.get(3000,TimeUnit.MILLISECONDS));//Hello world
                } catch (TimeoutException e) {
                    logger.error("timeout-exception",e);
                }
            } catch (InterruptedException e) {
                logger.error("interrupted-exception",e);
            } catch (ExecutionException e) {
                logger.error("execution-exception",e);
            }finally {
                threadPool.shutdown();
            }
    }
    View Code

      console输出:

    11:17:17.558 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out1:0
    Disconnected from the target VM, address: '127.0.0.1:54966', transport: 'socket'
    11:17:19.556 [pool-1-thread-1] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out2-----
    11:17:19.557 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - Hello world
    View Code

    超时会报错:

    SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
    10:44:07.413 [main] ERROR tk.mybatis.springboot.util.thread.ThreadPoolTest - timeout-exception
    java.util.concurrent.TimeoutException: null
        at java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at tk.mybatis.springboot.util.thread.ThreadPoolTest.main(ThreadPoolTest.java:30)
    
    Process finished with exit code 0

    总结:1  比起future.get(),推荐使用get (long timeout, TimeUnit unit) 方法,

              设置了超时时间可以防止程序无限制的等待future的结果,可以进行异常处理逻辑。

             2  虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,

                只能通过阻塞或者轮询的方式得到任务的结果

                阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,

                为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

     Future的接口很简单,只有五个方法。jdk8中源码如下:

    package java.util.concurrent;
    
    /**
     * A {@code Future} represents the result of an asynchronous
     * computation.  Methods are provided to check if the computation is
     * complete, to wait for its completion, and to retrieve the result of
     * the computation.  The result can only be retrieved using method
     * {@code get} when the computation has completed, blocking if
     * necessary until it is ready.  Cancellation is performed by the
     * {@code cancel} method.  Additional methods are provided to
     * determine if the task completed normally or was cancelled. Once a
     * computation has completed, the computation cannot be cancelled.
     * If you would like to use a {@code Future} for the sake
     * of cancellability but not provide a usable result, you can
     * declare types of the form {@code Future<?>} and
     * return {@code null} as a result of the underlying task.
     *
     * <p>
     * <b>Sample Usage</b> (Note that the following classes are all
     * made-up.)
     * <pre> {@code
     * interface ArchiveSearcher { String search(String target); }
     * class App {
     *   ExecutorService executor = ...
     *   ArchiveSearcher searcher = ...
     *   void showSearch(final String target)
     *       throws InterruptedException {
     *     Future<String> future
     *       = executor.submit(new Callable<String>() {
     *         public String call() {
     *             return searcher.search(target);
     *         }});
     *     displayOtherThings(); // do other things while searching
     *     try {
     *       displayText(future.get()); // use future
     *     } catch (ExecutionException ex) { cleanup(); return; }
     *   }
     * }}</pre>
     *
     * The {@link FutureTask} class is an implementation of {@code Future} that
     * implements {@code Runnable}, and so may be executed by an {@code Executor}.
     * For example, the above construction with {@code submit} could be replaced by:
     *  <pre> {@code
     * FutureTask<String> future =
     *   new FutureTask<String>(new Callable<String>() {
     *     public String call() {
     *       return searcher.search(target);
     *   }});
     * executor.execute(future);}</pre>
     *
     * <p>Memory consistency effects: Actions taken by the asynchronous computation
     * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
     * actions following the corresponding {@code Future.get()} in another thread.
     *
     * @see FutureTask
     * @see Executor
     * @since 1.5
     * @author Doug Lea
     * @param <V> The result type returned by this Future's {@code get} method
     */
    public interface Future<V> {
    
        /**
         * Attempts to cancel execution of this task.  This attempt will
         * fail if the task has already completed, has already been cancelled,
         * or could not be cancelled for some other reason. If successful,
         * and this task has not started when {@code cancel} is called,
         * this task should never run.  If the task has already started,
         * then the {@code mayInterruptIfRunning} parameter determines
         * whether the thread executing this task should be interrupted in
         * an attempt to stop the task.
         *
         * <p>After this method returns, subsequent calls to {@link #isDone} will
         * always return {@code true}.  Subsequent calls to {@link #isCancelled}
         * will always return {@code true} if this method returned {@code true}.
         *
         * @param mayInterruptIfRunning {@code true} if the thread executing this
         * task should be interrupted; otherwise, in-progress tasks are allowed
         * to complete
         * @return {@code false} if the task could not be cancelled,
         * typically because it has already completed normally;
         * {@code true} otherwise
         */
        boolean cancel(boolean mayInterruptIfRunning);
    
        /**
         * Returns {@code true} if this task was cancelled before it completed
         * normally.
         *
         * @return {@code true} if this task was cancelled before it completed
         */
        boolean isCancelled();
    
        /**
         * Returns {@code true} if this task completed.
         *
         * Completion may be due to normal termination, an exception, or
         * cancellation -- in all of these cases, this method will return
         * {@code true}.
         *
         * @return {@code true} if this task completed
         */
        boolean isDone();
    
        /**
         * Waits if necessary for the computation to complete, and then
         * retrieves its result.
         *
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         */
        V get() throws InterruptedException, ExecutionException;
    
        /**
         * Waits if necessary for at most the given time for the computation
         * to complete, and then retrieves its result, if available.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         * @throws TimeoutException if the wait timed out
         */
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    View Code

    看懂英文还是最直接的。

    二. CompletableFuture介绍

    2.1 Future模式的缺点

    • Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。

    • 要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。

    2.2 CompletableFuture介绍

     public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
     }

         Netty、Guava分别扩展了Java 的 Future 接口,方便异步编程。

        在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,

        可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,

        并且提供了转换和组合CompletableFuture的方法


    1 )主动完成计算
    CompletableFuture类实现了CompletionStageFuture接口,所以你还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
    public T     get()
    public T     get(long timeout, TimeUnit unit)
    public T     getNow(T valueIfAbsent) //如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent
    public T join()

     join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别

    @Test
        public void test(){
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
               int i=1/0;
               return 100;
            });
            logger.info(String.format("join=%d", future.join()));
            try {
                logger.info(String.format("get=%s", future.get()));
            } catch (InterruptedException e) {
               logger.error("error=%s",e);
            } catch (ExecutionException e) {
                logger.error("error=%s",e);
            }
        }

        尽管Future可以代表在另外的线程中执行的一段异步代码,但是你还是可以在本身线程中执行: 

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class BasicMain {
        private static Logger logger = LoggerFactory.getLogger(BasicMain.class);
       
    public static CompletableFuture<Integer> compute() { final CompletableFuture<Integer> future = new CompletableFuture<>(); return future; } public static void main(String[] args) throws Exception { final CompletableFuture<Integer> f = compute(); class Client extends Thread { CompletableFuture<Integer> f; Client(String threadName, CompletableFuture<Integer> f) { super(threadName); this.f = f; } @Override public void run() { try { logger.info(this.getName() + ": " + f.get()); } catch (InterruptedException e) { logger.error(e.getMessage()); } catch (ExecutionException e) { logger.error(e.getMessage()); } } } new Client("Client1", f).start(); new Client("Client2", f).start(); logger.info("waiting...."); f.complete(100); f.obtrudeValue(200);
    //logger Client1: 200 Client2: 200 或者 Client1: 100 Client1: 100 或者
    Client1: 200 Client2: 100 ,都有可能

    // f.completeExceptionally(new Exception());
        }
    }

     说明:

             可以看到我们并没有把f.complete(100) 放在另外的线程中去执行,但是在大部分情况下我们可能会用一个线程池去执行这些异步任 务。CompletableFuture.complete()CompletableFuture.completeExceptionally只能被调用一次。

            但是我们有两个后门方法可以重设这个值:obtrudeValueobtrudeException,但是使用的时候要小心,

            因为complete已经触发了客户端,有可能导致客户端会得到不期望的结果

    2)创建CompletableFuture对象。

         CompletableFuture的静态工厂方法

    方法名描述
    runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
    runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码。
    supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
    supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值

        eg: runAsync 和 supplyAsync 方法比较

           区别是:

             runAsync返回的CompletableFuture是没有返回值的

        原因:

          Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空

        共同点:

           因为方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务,比如:

     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "Hello";
            });
    
            try {
               logger.info("out = "+ future.get());//out = Hello
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    View Code
          CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println("Hello");
            });
    
            try {
               logger.info("out = "+ future.get());//out = null
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    View Code

    简单试验 - 主题:在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C

    package tk.mybatis.springboot.util.thread;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.LocalTime;
    import java.util.Random;
    import java.util.concurrent.*;
    
    /**
     * 在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C
     *
     * 两种方法useFuture和useCompletableFuture相比:
     *
     * 首先,useCompletableFuture 比 useFuture 的代码简单。
     *  在useFuture 里,既要自己照顾线程池的创建和销毁,还要负责对任务A和任务B的监控。
     *  而useCompletableFuture,只需要用CompletableFuture的runAfterEither就完成了任务A、任务B和任务C之间的依赖关系的定义。
     *
     */
    public class CompletableFutureTest {
    
        private static Logger logger = LoggerFactory.getLogger(CompletableFutureTest.class);
        private static Random random = new Random();
    
        /**
         * useFuture test
         * @throws InterruptedException
         * @throws ExecutionException
         */
        private static void useFuture() throws InterruptedException, ExecutionException {
            logger.info("useFuture");
            ExecutorService exector = Executors.newFixedThreadPool(3);
            Future<Void> futureA = exector.submit(() -> work("A1"));
            Future<Void> futureB = exector.submit(() -> work("B1"));
            while (true) {
                try {
                    futureA.get(1, TimeUnit.SECONDS);
                    break;
                } catch (TimeoutException e) {
                }
                try {
                    futureB.get(1, TimeUnit.SECONDS);
                    break;
                } catch (TimeoutException e) {
                }
            }
            exector.submit(() -> work("C1")).get();
            exector.shutdown();
        }
    
        private static void useCompletableFuture() throws InterruptedException, ExecutionException {
            logger.info("useCompletableFuture");
            CompletableFuture<Void> futureA = CompletableFuture.runAsync(() -> work("A2"));
            CompletableFuture<Void> futureB = CompletableFuture.runAsync(() -> work("B2"));
            logger.info("get="+futureA.runAfterEither(futureB, () -> work("C2")).get());
    
    //     或者
    //        CompletableFuture.runAsync(() -> work("A2"))
    //                .runAfterEither(CompletableFuture.runAsync(() -> work("B2"))
    //                        , () -> work("C2"))
    //                .get();
        }
    
    
        /**
         * logger 输出
         * @param name
         * @return
         */
        public static Void work(String name) {
            logger.info(name + " starts at " + LocalTime.now());
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(10));
            } catch (InterruptedException e) {
            }
            logger.info(name + " ends at " + LocalTime.now());
            return null;
        }
    
        /**
         * 从useFuture的输出可以看出,
         *  任务C1的开始并不是紧随着任务A1的完成,差了0.001秒,
         *  原因是在方法1里,是对任务A1和任务B1都用get(1,TimeUnit.SECONDS)来询问他们的状态,
         *  当其中一个任务先完成时,主线程可能正阻塞在另一个未完成任务的get上
         *
         * 而从useCompletableFuture完全不存在这样的问题,
         *      任务C2的开始于任务A1的结束之间没有任何的时间差
         *
         * @param args
         * @throws InterruptedException
         * @throws ExecutionException
         */
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    //        useFuture();
            //logger 输出
            // B1 starts at 10:28:09.562
    //         A1 starts at 10:28:09.562
    //         B1 ends at 10:28:17.566
    //         C1 starts at 10:28:17.567
    //         A1 ends at 10:28:18.563
    //         C1 ends at 10:28:20.570
    
    //        TimeUnit.SECONDS.sleep(10);
            useCompletableFuture();
            //logger 输出
    //        A2 starts at 10:43:46.867
    //        B2 starts at 10:43:46.867
    //        B2 ends at 10:43:51.871
    //        A2 ends at 10:43:51.871
    //        C2 starts at 10:43:51.871
    //        C2 ends at 10:44:00.874
            TimeUnit.SECONDS.sleep(10);//避免打印出的start - end 不全
    
        }
    
    }
    View Code

     3)计算结果完成时的处理

     当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

       public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(null, action);
        }
    
        public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(asyncPool, action);
        }
    
        public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
            return uniWhenCompleteStage(screenExecutor(executor), action);
        }
       public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
            return uniExceptionallyStage(fn);
        }

           可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
           方法不以Async结尾,意味着Action使用相同的线程执行

           Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

           注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

    package tk.mybatis.springboot.util.thread;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    
    public class MainTest {
        private static Logger logger = LoggerFactory.getLogger(MainTest.class);
    
        private static Random random = new Random();
        private static long t = System.currentTimeMillis();
    
        static Map<String,String> getMoreData() {
            logger.info("start");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            logger.info("end ");
            Map map=new HashMap();
            map.put(random.nextInt(1000),random.nextInt(1000));
            return map;
        }
    
        public static void main(String[] args) throws Exception {
            CompletableFuture<Map<String,String>> future = CompletableFuture.supplyAsync(MainTest::getMoreData);
            Future<Map<String,String>> f = future.whenComplete((v, e) -> {
                logger.info("v="+v);
                logger.info("e="+e);
            });
            logger.info("get="+f.get());
        }
    }

         exceptionally方法返回一个新的CompletableFuture

             当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,

             否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同(??)。

             也就是这个exceptionally方法用来处理异常的情况。

      

            CompletableFuture<Map<String,String>> futureException = future.exceptionally(new Function<Throwable, Map<String, String>>() {
                @Override
                public Map<String, String> apply(Throwable throwable) {
                    logger.error("error="+throwable.getMessage());
                    Map map=new HashMap();
                    map.put(random.nextInt(1000),random.nextInt(1000));
                    return map;
                }
            });
    
            logger.info("get.exception="+futureException.get());

          下面一组方法虽然也返回CompletableFuture对象,但是对象的值和原来的CompletableFuture计算的值不同。

          当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。

          因此这组方法兼有whenComplete和转换的两个功能。

    public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

         同样,不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

    4)转换

        CompletableFuture可以作为monad(单子)和functor。由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,

           而是告诉CompletableFuture当计算完成的时候请执行某个function。而且我们还可以将这些操作串联起来,或者将CompletableFuture组合起来。

    三 ThreadPoolTaskExecutor

          包路径:org.springframework.scheduling.concurrent

  • 相关阅读:
    前端学PHP之错误处理
    mysql数据库学习目录
    前端学数据库之存储
    前端学数据库之函数
    用shell脚本监控进程是否存在 不存在则启动的实例
    在notepad++里面使用正则表达式替换掉所有行逗号前面内容
    mysql合并 两个count语句一次性输出结果的方法
    硬件中断和DPC一直占40-52%左右 解决方法
    解决secureCRT 数据库里没有找到防火墙 '无' 此会话降尝试不通过防火墙进行连接。
    Java eclipse下 Ant build.xml实例详解 附完整项目源码
  • 原文地址:https://www.cnblogs.com/xingzc/p/8940873.html
Copyright © 2011-2022 走看看