  • 并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)

    在上一篇《并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)》中提到了线程池ThreadPoolExecutor的原理以及它的execute方法。这篇文章是接着上一篇文章写的,如果你没有阅读上一篇文章,建议你去读读。本文解析ThreadPoolExecutor#submit。




     1 public abstract class AbstractExecutorService implements ExecutorService {
     3     // RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
     4     // 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
     5     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
     6         return new FutureTask<T>(runnable, value);
     7     }
     9     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    10         return new FutureTask<T>(callable);
    11     }
    13     // 提交任务
    14     public Future<?> submit(Runnable task) {
    15         if (task == null) throw new NullPointerException();
    16         // 1. 将任务包装成 FutureTask
    17         RunnableFuture<Void> ftask = newTaskFor(task, null);
    18         // 2. 交给执行器执行,execute 方法由具体的子类来实现
    19         // 前面也说了,FutureTask 间接实现了Runnable 接口。
    20         execute(ftask);
    21         return ftask;
    22     }
    24     public <T> Future<T> submit(Runnable task, T result) {
    25         if (task == null) throw new NullPointerException();
    26         // 1. 将任务包装成 FutureTask
    27         RunnableFuture<T> ftask = newTaskFor(task, result);
    28         // 2. 交给执行器执行
    29         execute(ftask);
    30         return ftask;
    31     }
    33     public <T> Future<T> submit(Callable<T> task) {
    34         if (task == null) throw new NullPointerException();
    35         // 1. 将任务包装成 FutureTask
    36         RunnableFuture<T> ftask = newTaskFor(task);
    37         // 2. 交给执行器执行
    38         execute(ftask);
    39         return ftask;
    40     }
    41 }

    尽管submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的线程则是没有返回值的,也就是说在上面的3个方法中,submit(Callable<T> task)能获取到它的返回值,submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值或者准确来说交给线程处理一下,而最后一个方法submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。


    submit(Callable<T> task)

     1 /**
     2  * @author: ChenHao
     3  * @Date: Created in 14:54 2019/1/11
     4  */
     5 public class Test {
     6     public static void main(String[] args) throws ExecutionException, InterruptedException {
     7         Callable<String> callable = new Callable<String>() {
     8             public String call() throws Exception {
     9                 System.out.println("This is ThreadPoolExetor#submit(Callable<T> task) method.");
    10                 return "result";
    11             }
    12         };
    14         ExecutorService executor = Executors.newSingleThreadExecutor();
    15         Future<String> future = executor.submit(callable);
    16         executor.shutdown();
    17         System.out.println(future.get());
    18     }
    19 }


    submit(Runnable task, T result)

     1 /**
     2  * @author: ChenHao
     3  * @Date: Created in 14:54 2019/1/11
     4  */
     5 public class Test {
     6     public static void main(String[] args) throws ExecutionException, InterruptedException {
     8         ExecutorService executor = Executors.newSingleThreadExecutor();
     9         Data data = new Data();
    10         Future<Data> future = executor.submit(new Task(data), data);
    11         executor.shutdown();
    12         System.out.println(future.get().getName());
    13     }
    14 }
    15 class Data {
    16     String name;
    17     public String getName() {
    18         return name;
    19     }
    20     public void setName(String name) {
    21         this.name = name;
    22     }
    23 }
    25 class Task implements Runnable {
    26     Data data;
    27     public Task(Data data) {
    28         this.data = data;
    29     }
    30     @Override
    31     public void run() {
    32         System.out.println("This is ThreadPoolExetor#submit(Runnable task, T result) method.");
    33         data.setName("陈浩");
    34     }
    35 }


    submit(Runnable task)

     1 /**
     2  * @author: ChenHao
     3  * @Date: Created in 14:54 2019/1/11
     4  */
     5 public class Test {
     6     public static void main(String[] args) throws ExecutionException, InterruptedException {
     7         Runnable runnable = new Runnable() {
     8             @Override
     9             public void run() {
    10                 System.out.println("This is ThreadPoolExetor#submit(Runnable runnable) method.");
    11             }
    12         };
    14         ExecutorService executor = Executors.newSingleThreadExecutor();
    15         Future future = executor.submit(runnable);
    16         executor.shutdown();
    17         System.out.println(future.get());
    18     }
    19 }



    1 RunnableFuture<T> ftask = newTaskFor(task);
    2 execute(ftask);


    1 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    2     return new FutureTask<T>(runnable, value);
    3 }
    5 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    6     return new FutureTask<T>(callable);
    7 }


    这里我建议大家去看看我之前的一篇文章《Java 多线程(五)—— 线程池基础 之 FutureTask源码解析

    submit(Callable<T> task)


    1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    2     return new FutureTask<T>(callable);
    3 }

     我们看看 FutureTask 的结构

     1 public class FutureTask<V> implements RunnableFuture<V> { 
     2     private volatile int state; 
     3     private static final int NEW = 0; //初始状态 
     4     private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。 
     5     private static final int NORMAL = 2; //任务正常完成,结果被set 
     6     private static final int EXCEPTIONAL = 3; //任务抛出异常 
     7     private static final int CANCELLED = 4; //任务已被取消 
     8     private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断 
     9     private static final int INTERRUPTED = 6; //线程已被中断 
    11     //将要执行的任务 
    12     private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常 
    13     private Object outcome; // non-volatile, protected by state reads/writes //执行callable的线程,调用FutureTask.run()方法通过CAS设置 
    14     private volatile Thread runner; //栈结构的等待队列,该节点是栈中的最顶层节点。 
    15     private volatile WaitNode waiters; 
    17     public FutureTask(Callable<V> callable) {
    18         if (callable == null)
    19             throw new NullPointerException();
    20         this.callable = callable;
    21         this.state = NEW;       // ensure visibility of callable
    22     }
    23     ....
    24 }
    1 public interface RunnableFuture<V> extends Runnable, Future<V> {
    2     /**
    3      * Sets this Future to the result of its computation
    4      * unless it has been cancelled.
    5      */
    6     void run();
    7 }

     我们知道 FutureTask 继承了 Runnable,这里将 Callable<T> callable 的实例封装成 FutureTask 传给 execute(ftask);我们再来看看上一篇文章中线程运行的代码

     1 // 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
     2 // 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
     3 final void runWorker(Worker w) {
     4     Thread wt = Thread.currentThread();
     5     // 该线程的第一个任务(如果有的话)
     6     Runnable task = w.firstTask;
     7     w.firstTask = null;
     8     w.unlock(); // allow interrupts
     9     boolean completedAbruptly = true;
    10     try {
    11         // 循环调用 getTask 获取任务
    12         while (task != null || (task = getTask()) != null) {
    13             w.lock();          
    14             // 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
    15             if ((runStateAtLeast(ctl.get(), STOP) ||
    16                  (Thread.interrupted() &&
    17                   runStateAtLeast(ctl.get(), STOP))) &&
    18                 !wt.isInterrupted())
    19                 wt.interrupt();
    20             try {
    21                 beforeExecute(wt, task);
    22                 Throwable thrown = null;
    23                 try {
    24                     // 到这里终于可以执行任务了,这里是最重要的,task是什么?是Worker 中的firstTask属性
    26                     task.run();
    27                 } catch (RuntimeException x) {
    28                     thrown = x; throw x;
    29                 } catch (Error x) {
    30                     thrown = x; throw x;
    31                 } catch (Throwable x) {
    32                     thrown = x; throw new Error(x);
    33                 } finally {
    34                     afterExecute(task, thrown);
    35                 }
    36             } finally {
    37                 // 一个任务执行完了,这个线程还可以复用,接着去队列中拉取任务执行
    38                 // 置空 task,准备 getTask 获取下一个任务
    39                 task = null;
    40                 // 累加完成的任务数
    41                 w.completedTasks++;
    42                 // 释放掉 worker 的独占锁
    43                 w.unlock();
    44             }
    45         }
    46         completedAbruptly = false;
    47     } finally {
    48         // 如果到这里,需要执行线程关闭:
    49         // 说明 getTask 返回 null,也就是超过corePoolSize的线程过了超时时间还没有获取到任务,也就是说,这个 worker 的使命结束了,执行关闭
    50         processWorkerExit(w, completedAbruptly);
    51     }
    52 }

     由上面第6行代码 task 就是execute(ftask)传入的任务,第26行 task.run(); 实际上就是 new FutureTask<T>(callable).run(),我们看看FutureTask中的run()方法

     1 public void run() {
     2     //保证callable任务只被运行一次
     3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
     4         return;
     5     try {
     6         Callable < V > c = callable;
     7         if (c != null && state == NEW) {
     8             V result;
     9             boolean ran;
    10             try { 
    11                 //执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的
    12                 result = c.call();
    13                 //上面的call()是同步的,只有上面的result有了结果才会继续执行
    14                 ran = true;
    15             } catch (Throwable ex) {
    16                 result = null;
    17                 ran = false;
    18                 setException(ex);
    19             }
    20             if (ran)
    21                 //执行完了,设置result
    22                 set(result);
    23         }
    24     }
    25     finally {
    26         runner = null;
    27         int s = state;
    28         //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
    29         if (s >= INTERRUPTING)
    30             handlePossibleCancellationInterrupt(s);
    31     }
    32 }

    在 FutureTask的构造方法中 this.callable = callable; ,因此我们可以知道上面run()方法中第6行 c 就是 代码示例中的 new Callable<String>(),c.call()就是调用 代码示例中new Callable 的call方法,并且这里可以取到返回结果,第22行处设置FutureTask 中 outcome 的值,这样线程就可以取到返回值了。

    1 protected void set(V v) {
    2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    3         outcome = v;
    4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    5         finishCompletion();
    6     }
    7 }


    submit(Runnable task, T result)

     1 public <T> Future<T> submit(Runnable task, T result) {
     2     if (task == null) throw new NullPointerException();
     3     RunnableFuture<T> ftask = newTaskFor(task, result);
     4     execute(ftask);
     5     return ftask;
     6 }
     8 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
     9     return new FutureTask<T>(runnable, value);
    10 }


    1 public FutureTask(Runnable runnable, V result) {
    2     this.callable = Executors.callable(runnable, result);
    3     this.state = NEW;       // ensure visibility of callable
    4 }
     1 public static <T> Callable<T> callable(Runnable task, T result) {
     2     if (task == null)
     3         throw new NullPointerException();
     4     return new RunnableAdapter<T>(task, result);
     5 }
     7 static final class RunnableAdapter<T> implements Callable<T> {
     8     final Runnable task;
     9     final T result;
    10     RunnableAdapter(Runnable task, T result) {
    11         this.task = task;
    12         this.result = result;
    13     }
    14     public T call() {
    15         task.run();
    16         return result;
    17     }
    18 }

    上面将 runnable, result 封装成了 RunnableAdapter 作为FutureTask的callable属性,这和上面的submit(Callable<T> task) 是不同的,submit(Callable<T> task)是直接将 Callable<T> task作为FutureTask的callable属性。我们看看FutureTask中的run()方法中第6行 c 就是FutureTask 构造方法中的new RunnableAdapter<T>(task, result) ,c.call()就是调用 RunnableAdapter<T>(task, result) 的call方法,call()中的task.run()就是上面代码示例中new Task(data) 中的 run(),run()方法中业务大代码改变了data对象的属性,callable(Runnable task, T result)中也是传的相同的对象data, 所以,result = c.call(); 就是把更改后的data返回,并且将data设置为设置FutureTask 中 outcome 的值,后面的逻辑就是一样的了。


    如果new FutureTask<T>(runnable, null),则result = c.call(); 返回的值也是null,最后从线程池中get的值也是null。

