zoukankan      html  css  js  c++  java
  • Java并发编程-扩展可回调的Future

    前提

    最近在看JUC线程池java.util.concurrent.ThreadPoolExecutor的源码实现,其中了解到java.util.concurrent.Future的实现原理。从目前java.util.concurrent.Future的实现来看,虽然实现了异步提交任务,但是任务结果的获取过程需要主动调用Future#get()或者Future#get(long timeout, TimeUnit unit),而前者是阻塞的,后者在异步任务执行时间不确定的情况下有可能需要进行轮询,这两种情况和异步调用的初衷有点相违背。于是笔者想结合目前了解到的Future实现原理的前提下扩展出支持(监听)回调的Future,思路上参考了Guava增强的ListenableFuture。本文编写的时候使用的JDK是JDK11,其他版本可能不适合。

    简单分析Future的实现原理

    虚拟例子推演

    并发大师Doug Lea在设计JUC线程池的时候,提供了一个顶层执行器接口Executor

    public interface Executor {
    
        void execute(Runnable command);
    }    
    

    实际上,这里定义的方法Executor#execute()是整套线程池体系最核心的接口,也就是ThreadPoolExecutor定义的核心线程、额外创建的线程(线程池最大线程容量 - 核心线程数)都是在这个接口提交任务的时候懒创建的,也就是说ExecutorService接口扩展的功能都是基于Executor#execute()的基础进行扩展。Executor#execute()方法只是单纯地把任务实例Runnable对象投放到线程池中分配合适的线程执行,但是由于方法返回值是void类型,我们是无法感知任务什么时候执行完毕。这个时候就需要对Runnable任务实例进行包装(下面是伪代码 + 伪逻辑):

    // 下面这个Wrapper和Status类是笔者虚构出来
    @RequiredArgsConstructor
    class Wrapper implements Runnable{
    
        private final Runnable target;
        private Status status = Status.of("初始化");
    
        @Override
        public void run(){
            try{
               target.run();
               status = Status.of("执行成功");
            }catch(Throwable t){
               status = Status.of("执行异常"); 
            }
        }
    }
    

    我们只需要把new Wrapper(原始Runnable实例)投放到线程池执行,那么通过定义好的Status状态记录变量就能得知异步任务执行的状态,以及什么时候执行完毕(包括正常的执行完毕和异常的执行完毕)。这里仅仅解决了任务执行的状态获取,但是Executor#execute()方法法返回值是void类型的特点使得我们无法回调Runnable对象执行的结果。这个时候需要定义一个可以回调执行结果的接口,其实已经有现成的接口Callable

    @FunctionalInterface
    public interface Callable<V> {
    
        V call() throws Exception;
    }    
    

    这里遇到了一个问题:由于Executor#execute()只接收Runnable参数,我们需要把Callable接口适配到Runnable接口,这个时候,做一次简单的委托即可:

    @RequiredArgsConstructor
    class Wrapper implements Runnable{
    
        private final Callable callable;
        private Status status = Status.of("初始化");
        @Getter
        private Object outcome;
    
        @Override
        public void run(){
            try{
               outcome = callable.call();
               status = Status.of("执行成功");
            }catch(Throwable t){
               status = Status.of("执行异常"); 
               outcome = t;
            }
        }
    }
    

    这里把Callable实例直接委托给Wrapper,而Wrapper实现了Runnable接口,执行结果直接存放在定义好的Object类型的对象outcome中即可。当我们感知到执行状态已经结束,就可以从outcome中提取到执行结果。

    Future的实现

    上面一个小结仅仅对Future实现做一个相对合理的虚拟推演,实际上,RunnableFuture才是JUC中常用的复合接口,它同时实现了RunnableFuture

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        
        void run();
    }
    

    上一节提到的虚构出来的Wrapper类,在JUC中类似的实现是java.util.concurrent.FutureTask,它就是CallableRunnable的适配器,FutureTask实现了RunnableFuture接口:

    public class FutureTask<V> implements RunnableFuture<V> {
    
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        /** The underlying callable; nulled out after running */
        private Callable<V> callable;
        /** The result to return or exception to throw from get() */
        private Object outcome; // non-volatile, protected by state reads/writes
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner;
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters;
        
        // 省略其他代码
    }    
    

    注意到核心属性state表示执行状态,outcome承载执行结果。接着看提交Callable类型任务的方法ExecutorService#submit()

    public interface ExecutorService extends Executor {
    
        // 省略其他接口方法
    
        <T> Future<T> submit(Callable<T> task);
    }    
    

    当我们通过上述ExecutorService#submit()方法提交Callable类型任务的时候,实际上做了如下的步骤:

    1. 检查入参task的存在性,如果为null抛出NullPointerException
    2. Callable类型的task包装为FutureTask实例。
    3. 把新建的FutureTask实例放到线程池中执行,也就是调用Executor#execute(FutureTask实例)
    4. 返回FutureTask实例的接口实例RunnableFuture(实际上是返回子接口Future实例)。

    如果我们需要获取结果,可以Future#get()或者Future#get(long timeout, TimeUnit unit)获取,调用这两个方法的时候参看FutureTask里面的方法实现,得知步骤如下:

    1. 如果状态state小于等于COMPLETING(1),说明任务还在执行中,获取结果的请求线程会放入WaitNode类型的队列中进行阻塞。
    2. 如果任务执行完毕,不管异常完毕还是正常完毕,除了会更新状态state和把结果赋值到outcome之外,还会唤醒所有阻塞获取结果的线程,然后调用钩子方法FutureTask#done()(具体见源码FutureTask#finishCompletion())。

    其实分析了这么多,笔者想指出的结论就是:Callable类型任务提交到线程池中执行完毕(包括正常执行完毕和异常执行完毕)之后,都会回调钩子方法FutureTask#done()。这个就是我们扩展可监听Future的理论依据。

    扩展可回调的Future

    先做一次编码实现,再简单测试其功能。

    编码实现

    先定义一个Future接口的子接口ListenableFuture,用于添加可监听的回调:

    public interface ListenableFuture<V> extends Future<V> {
    
        void addCallback(ListenableFutureCallback<V> callback, Executor executor);
    }
    

    ListenableFutureCallback是一个函数式回调接口:

    @FunctionalInterface
    public interface ListenableFutureCallback<V> {
    
        void callback(V value, Throwable throwable);
    }
    

    对于ListenableFutureCallback而言,回调的结果valuethrowable是互斥的。正常执行完毕的情况下value将会是执行结果值,throwablenull;异常执行完毕的情况下,value将会是nullthrowable将会是抛出的异常实例。如果更习惯于分开处理正常执行完毕的结果和异常执行完毕的结果,ListenableFutureCallback可以这样定义:

    public interface ListenableFutureCallback<V> {
    
        void onSuccess(V value);
    
        void onError(Throwable throwable);
    }
    

    接着定义ListenableExecutorService接口继承ExecutorService接口:

    public interface ListenableExecutorService extends ExecutorService {
    
        <T> ListenableFuture<T> listenableSubmit(Callable<T> callable);
    
        /**
         * 定义这个方法是因为有些时候由于任务执行时间非常短,有可能通过返回的ListenableFuture实例添加回调之前已经执行完毕,因此可以支持显式传入回调
         *
         * @param callable  callable
         * @param callbacks callbacks
         * @param executor  executor
         * @return ListenableFuture
         */
        <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);
    }
    

    然后添加一个执行单元适配器ListenableFutureCallbackRunnable,承载每次回调触发的调用(实现Runnable接口,从而支持异步执行):

    @RequiredArgsConstructor
    public class ListenableFutureCallbackRunnable<V> implements Runnable {
    
        private final ListenableFutureCallback<V> callback;
        private final V value;
        private final Throwable throwable;
    
        @Override
        public void run() {
            callback.callback(value, throwable);
        }
    }
    

    接着需要定义一个FutureTask的子类ListenableFutureTask,核心逻辑是覆盖FutureTask#done()方法触发回调:

    // ListenableFutureTask
    public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
    
        private final List<Execution<V>> executions = new ArrayList<>();
    
        public ListenableFutureTask(Callable<V> callable) {
            super(callable);
        }
    
        public ListenableFutureTask(Runnable runnable, V result) {
            super(runnable, result);
        }
    
        public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
            return new ListenableFutureTask<>(callable);
        }
    
        @Override
        protected void done() {
            Iterator<Execution<V>> iterator = executions.iterator();
            Throwable throwable = null;
            V value = null;
            try {
                value = get();
            } catch (Throwable t) {
                throwable = t;
            }
            while (iterator.hasNext()) {
                Execution<V> execution = iterator.next();
                ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
                        value, throwable);
                // 异步回调
                if (null != execution.getExecutor()) {
                    execution.getExecutor().execute(callbackRunnable);
                } else {
                    // 同步回调
                    callbackRunnable.run();
                }
            }
        }
    
        @Override
        public void addCallback(ListenableFutureCallback<V> callback, Executor executor) {
            Execution<V> execution = new Execution<>();
            execution.setCallback(callback);
            execution.setExecutor(executor);
            executions.add(execution);
        }
    }
    
    // Execution - 承载每个回调实例和对应的Executor,Executor实例为null则进行同步回调
    @Data
    public class Execution <V>{
    
        private Executor executor;
        private ListenableFutureCallback<V> callback;
    }
    

    最后一步就是编写线程池ListenableThreadPoolExecutor,继承自ThreadPoolExecutor并且实现ListenableExecutorService接口:

    public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {
    
        public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
         BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
            if (null == callable) {
                throw new IllegalArgumentException("callable");
            }
            ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
            execute(listenableFutureTask);
            return listenableFutureTask;
        }
    
        @Override
        public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) {
            if (null == callable) {
                throw new IllegalArgumentException("callable");
            }
            if (null == callbacks) {
                throw new IllegalArgumentException("callbacks");
            }
            ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
            for (ListenableFutureCallback<T> callback : callbacks) {
                listenableFutureTask.addCallback(callback, executor);
            }
            execute(listenableFutureTask);
            return listenableFutureTask;
        }
    }
    

    测试

    引入junit,编写测试类如下:

    public class ListenableFutureTest {
    
        private static ListenableExecutorService EXECUTOR;
        private static Executor E;
    
        @BeforeClass
        public static void before() {
            EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10), new ThreadFactory() {
    
                private final AtomicInteger counter = new AtomicInteger();
    
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement()));
                    return thread;
                }
            });
            E = Executors.newFixedThreadPool(3);
        }
    
        @Test
        public void testListenableFuture1() throws Exception {
            ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
                Thread.sleep(1000);
                return "message";
            });
            future.addCallback((v, t) -> {
                System.out.println(String.format("Value = %s,Throwable = %s", v, t));
            }, null);
            Thread.sleep(2000);
        }
    
        @Test
        public void testListenableFuture2() throws Exception {
            ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
                Thread.sleep(1000);
                throw new RuntimeException("exception");
            });
            future.addCallback((v, t) -> {
                System.out.println(String.format("Value = %s,Throwable = %s", v, t));
            }, null);
            Thread.sleep(2000);
        }
    
        @Test
        public void testListenableFuture3() throws Exception {
            ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
                Thread.sleep(1000);
                return "message";
            });
            future.addCallback((v, t) -> {
                System.out.println(String.format("Value = %s,Throwable = %s", v, t));
            }, E);
            System.out.println("testListenableFuture3 end...");
            Thread.sleep(2000);
        }
    
        @Test
        public void testListenableFuture4() throws Exception {
            ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
                Thread.sleep(1000);
                throw new RuntimeException("exception");
            });
            future.addCallback((v, t) -> {
                System.out.println(String.format("Value = %s,Throwable = %s", v, t));
            }, E);
            System.out.println("testListenableFuture4 end...");
            Thread.sleep(2000);
        }
    }
    

    执行结果:

    // testListenableFuture1
    Value = message,Throwable = null
    
    // testListenableFuture2
    Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
    
    // testListenableFuture3
    testListenableFuture3 end...
    Value = message,Throwable = null
    
    // testListenableFuture4
    testListenableFuture4 end...
    Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
    

    和预期的结果一致,注意一下如果Callable执行抛出异常,异常被包装为ExecutionException,要调用Throwable#getCause()才能得到原始的异常实例。

    小结

    本文通过了解ThreadPoolExecutorFuture的实现原理做简单的扩展,使得异步提交任务变得更加优雅和简便。强化了动手能力的同时,也能加深对并发编程的一些认知。当然,本文只是提供一个十分简陋的实现,笔者其实还想到了如对回调处理的耗时做监控、回调打上分组标签执行等等更完善的功能,等到有需要的场景再进行实现。

    这里记录一下过程中的一些领悟:

    • Executor#execute()是线程池的核心接口,所有其他功能都是基于此接口做扩展,它的设计本身是无状态的。
    • 灵活使用适配器模式,可以在不改变已发布的接口的功能同时实现新的接口的功能适配。
    • 要善于发掘和使用JDK类库设计者留给开发者的扩展接口。

    个人博客

    (本文完 c-1-d e-a-20190702)

    技术公众号(《Throwable文摘》),不定期推送笔者原创技术文章(绝不抄袭或者转载):

    娱乐公众号(《天天沙雕》),甄选奇趣沙雕图文和视频不定期推送,缓解生活工作压力:

  • 相关阅读:
    linux的软连接和硬连接
    各种Python简单功能代码
    《财报就像一本故事书》刘顺仁(二) ——财务报表
    Atitit .h5文件上传 v3
    Atitti. 语法树AST、后缀表达式、DAG、三地址代码
    Atitit.在线充值功能的设计
    Atitit。数据库 安全性 重要敏感数据加密存储解决方案
    atitit.数据验证db数据库数据验证约束
    Atitit.提升电子商务安全性 在线充值功能安全方面的设计
    Atitit.antlr实现词法分析
  • 原文地址:https://www.cnblogs.com/throwable/p/12319626.html
Copyright © 2011-2022 走看看