zoukankan      html  css  js  c++  java
  • JDK源码分析之concurrent包(三) -- Future方式的实现

      上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读。我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明。


    在进入源码前,首先来看下Future方式相关的API:

    • 接口Callable:有返回结果并且可能抛出异常的任务;
    • 接口Future:表示异步执行的结果;
    • 类FutureTask:实现Future、Runnable等接口,是一个异步执行的任务。可以直接执行,或包装成Callable执行;
    • 接口CompletionService:将生产新的异步任务与使用已完成任务的结果分离开来的服务,用来执行Callable或Runnable,并异步获取执行结果;
    • 类ExecutorCompletionService:实现CompletionService接口,使用构造时传入的Executor来执行Callable或Runnable,

    接下来通过一个常规的使用实例来展示这些API之间的关系:

     1 ExecutorService executor = Executors.newFixedThreadPool(3);
     2 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
     3 Future<String> future = completionService.submit(new Callable<String>() {
     4 
     5     public String call() throws Exception {
     6         // do something...
     7         return "success";
     8     }
     9 });
    10 
    11 // 其它的程序逻辑。。。
    12 
    13 // 异步的获取执行结果
    14 String result = future.get();

    常规调用主要是通过CompletionService.submit()方法,那我们就从这个方法开始进入JDK源码,以下是ExecutorCompletionService类的源码:

     1 public Future<V> submit(Callable<V> task) {
     2     if (task == null) throw new NullPointerException();
     3     RunnableFuture<V> f = newTaskFor(task);
     4     executor.execute(new QueueingFuture(f));
     5     return f;
     6 }
     7 
     8 public Future<V> submit(Runnable task, V result) {
     9     if (task == null) throw new NullPointerException();
    10     RunnableFuture<V> f = newTaskFor(task, result);
    11     executor.execute(new QueueingFuture(f));
    12     return f;
    13 }

    sunmit()方法有两个重载,这里我们只对参数为Callable的方法解读,因为另一个也是间接的封装成了Callable最后调用的。

    上述代码的第3行可以看到,Callable转换成了RunnableFuture来交给executor执行,下面来看newTaskFor(task)方法的源码:

    1 private RunnableFuture<V> newTaskFor(Callable<V> task) {
    2     if (aes == null)
    3         return new FutureTask<V>(task);
    4     else
    5         return aes.newTaskFor(task);
    6 }

    代码第3行第5行的结果是一样的,继续来看这个FutureTask如何构造的:

    1 public FutureTask(Callable<V> callable) {
    2     if (callable == null)
    3         throw new NullPointerException();
    4     sync = new Sync(callable);
    5 }

    还是没什么用,然后我们详细的来看下Sync的源码:

     1 /**
     2  * Synchronization control for FutureTask. Note that this must be
     3  * a non-static inner class in order to invoke the protected
     4  * <tt>done</tt> method. For clarity, all inner class support
     5  * methods are same as outer, prefixed with "inner".
     6  *
     7  * Uses AQS sync state to represent run status
     8  */
     9 private final class Sync extends AbstractQueuedSynchronizer {
    10 
    11     /** The underlying callable */
    12     private final Callable<V> callable;
    13     /** The result to return from get() */
    14     private V result;
    15     /** The exception to throw from get() */
    16     private Throwable exception;
    17 
    18     Sync(Callable<V> callable) {
    19         this.callable = callable;
    20     }
    21 
    22     V innerGet() throws InterruptedException, ExecutionException {
    23         acquireSharedInterruptibly(0);
    24         if (getState() == CANCELLED)
    25             throw new CancellationException();
    26         if (exception != null)
    27             throw new ExecutionException(exception);
    28         return result;
    29     }
    30 
    31     void innerSet(V v) {
    32       for (;;) {
    33         int s = getState();
    34         if (s == RAN)
    35           return;
    36         if (s == CANCELLED) {
    37           // aggressively release to set runner to null,
    38           // in case we are racing with a cancel request
    39           // that will try to interrupt runner
    40           releaseShared(0);
    41           return;
    42         }
    43         if (compareAndSetState(s, RAN)) {
    44           result = v;
    45           releaseShared(0);
    46           done();
    47           return;
    48         }
    49       }
    50     }
    51 
    52     void innerSetException(Throwable t) {
    53       for (;;) {
    54         int s = getState();
    55         if (s == RAN)
    56           return;
    57         if (s == CANCELLED) {
    58             // aggressively release to set runner to null,
    59             // in case we are racing with a cancel request
    60             // that will try to interrupt runner
    61             releaseShared(0);
    62             return;
    63         }
    64         if (compareAndSetState(s, RAN)) {
    65           exception = t;
    66           result = null;
    67           releaseShared(0);
    68           done();
    69           return;
    70         }
    71       }
    72     }
    73 
    74     void innerRun() {
    75         if (!compareAndSetState(0, RUNNING))
    76             return;
    77         try {
    78             runner = Thread.currentThread();
    79             if (getState() == RUNNING) // recheck after setting thread
    80                 innerSet(callable.call());
    81             else
    82                 releaseShared(0); // cancel
    83         } catch (Throwable ex) {
    84             innerSetException(ex);
    85         }
    86     }
    87     
    88     // others codes
    89 }

    上述代码列出了几个重要的方法,可以大概的看出,Future方式的玄机,基本都在这个内部类里了。下面就对这个内部类中的几个方法少做解释:

    首先,类的注释主要说明了:内部类必须是非静态的,是为了调用外部类的done()方法(这个我们以后再说)。还有内部类的方法都是以“前缀inner+外部类方法名”来命名的。

    其次,通过查看外部类的源码可得知:外部类的所有方法都是通过内部类中同名的inner方法来调用的(源码很简单这里没有列出)。

    然后,我们来看这个类中的其中3个成员变量及其注释,就可以大概猜到:callable是传入的执行过程,result用来存储callable的返回值,exception存储callable抛出的异常(如果有)。

    最后,我们来分别看这个类中的几个关键方法:

    上述代码第74行的innerRun()方法:注意外部类是FutureTask,实现了Runnable接口,事实上就是最开始所说的submit()方法中,最终要执行的Runnable任务,此时执行的其实是内部类的innerRun(),通过代码的第80行可以看出,是调用了callable.call()并把返回值通过innerSet(V v)赋值给了成员变量result。如果callable.call()有异常,则通过innerSetException(Throwable t)赋值给成员变量exception。

    通过innserGet()方法差不多就知道了异步获取执行结果的原理了,第23行的acquireSharedInterruptibly(0)方法的意义在于:要等Runnable任务执行完或被中断才能执行后面的代码。(注:这块实现虽然被一笔带过,但其实逻辑还是有点复杂,其实现主要是用死循环检查执行Callable线程的状态,类似自旋锁的概念

    最后,在回过头来看CompletionService.submit()方法:

    1 public Future<V> submit(Callable<V> task) {
    2     if (task == null) throw new NullPointerException();
    3     RunnableFuture<V> f = newTaskFor(task);
    4     executor.execute(new QueueingFuture(f));
    5     return f;
    6 }

    代码的第4行并非执行的是我们上面说的FutureTask,而是将这个FutureTask由封装成了QueueingFuture才交给executor执行,当我们看了QueueingFuture的源码就会了解到

     1 /**
     2  * FutureTask extension to enqueue upon completion
     3  */
     4 private class QueueingFuture extends FutureTask<Void> {
     5     QueueingFuture(RunnableFuture<V> task) {
     6         super(task, null);
     7         this.task = task;
     8     }
     9     protected void done() { completionQueue.add(task); }
    10     private final Future<V> task;
    11 }

    这个封装的意义无非是想在callable.call()执行完后调用第9行的completionQueue.add(task),done()方法是不是很眼熟?

    注释其实已经说明了:是为了让完成的任务入列到completionQueue中,以实现本文最开始罗列出来的,CompletionService接口的“将新的异步任务与完成的任务分离开来”的特性。

    总结

    本文通过ExecutorCompletionService类与FutureTask类及其内部类中部分关键处源码的解读,介绍了Java5中Future方式的原理。其实可以概括为一句话:

    • 在某线程中执行Callable时,将执行结果或抛出的异常存放在临时变量中,其它线程在Callable执行完或中断前,阻塞的获取执行结果。

    关于Executor框架的源码就解读到这,下篇文章开始一些工具类的源码解析。

     

  • 相关阅读:
    Vasya and Endless Credits CodeForces
    Dreamoon and Strings CodeForces
    Online Meeting CodeForces
    数塔取数 基础dp
    1001 数组中和等于K的数对 1090 3个数和为0
    1091 线段的重叠
    51nod 最小周长
    走格子 51nod
    1289 大鱼吃小鱼
    POJ 1979 Red and Black
  • 原文地址:https://www.cnblogs.com/hanmou/p/4624673.html
Copyright © 2011-2022 走看看