前言
线程池的作用就是将线程的管理、创建、销毁等操作与线程需要执行的任务隔离开来,从而避免线程频繁的创建与销毁,以及大量的线程的上下文切换造成的资源损耗。关于Java并发包中的线程池部分,我把它们分为两部分,即线程池和Fork/Join框架。
线程池
通常线程池的时候大家都是从工具类Executors开始的,例如创建线程池,其实它内部的线程池实现才是最应该引起我们注意的。Java线程池的类结构图如下(忽略了非并发包以及静态的、私有的、Final的内部类实现):
上图画出了Java并发包中关于线程池的类结构图(Note: 绿色表示接口,蓝色表示实例类,黄色表示抽象类)它主要包含三个接口,一个抽象类,三个实现类和一个工具类。另外Callable是创建线程运行时代码的接口, ThreadFactory用于创建线程的工厂。
Executor
作为顶层接口,它只提供了一个接口方法void execute(Runnable command),用于接受并执行通过参数提交的可执行任务, 它可以将任务提交与每个任务的运行机制(包括线程使用、调度等细节)分离开来。它提供了这一的内存一致性:
在向Executor提交可执行任务对象之前的线程中的所有操作happen-before该Executor真正开始执行之前的其他操作,这些操作可能在另一个线程中。
ExecutorService
相比Executor接口,ExecutorService是一个更广泛的接口,它在Executor接口的基础上增加了一些管理线程池对象的关闭(或者叫终止)的方法,以及一些可以返回跟踪一个甚至多个异步任务进度及结果的Future对象的方法。同时,它还将可执行任务类型从Runnable扩展到也支持Callable类型的任务。
一个ExecutorService可以通过两个方法被终止(被终止的ExecutorService将不再接受新任务):
一个是shutdown()方法,该方法在终止前允许执行以前提交的任务,另一个是shutdownNow() ,该方法不但会阻止还未被执行的任务被调度执行,还会试图立即停止正在执行的任务。
这两个方法通常都会让人产生疑惑,其实shutdown()其实可以看着是一种宣告,该ExecutorService不再接新任务了,但是那些之前提交的任务还是会被调度执行,但并不会等待他们真正执行结束。而shutdownNow()则显得很迫切的想要立刻终止所有任务,不管是正在执行的还是根本就还没被调度执行的,但通常这种终止那些正在运行的任务的方法是通过Thread.interrupt来实现的,若该任务逻辑中不存在那样可以被打断的操作,它这种企图终止正在执行的任务的愿望将会落空,所以任何未能响应中断的任务都可能永远不会终止。任何未使用的ExecutorService都应该及时的关闭以允许JVM回收其资源。
ExecutorService提供的awaitTermination接口方法可以在调用完shutdown之后,在给定的时间内等待所有任务都执行完成,并返回true。所以该方法一般是在调用完shutdown之后才使用。
通过那些返回Future对象的接口方法可以更好的处理异步任务,方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行一个任务集合,然后等待至少一个或全部任务完成。
isTerminated方法可以返回是否所有任务都已经执行结束的标识,true或false,但除非先了调用shutdown或shutdown now,否则isTerminated永远不会为真。
下面是它的所有接口方法:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //启动关闭,在此之前提交的任务将被执行,但不再接受新任务。如果已经关闭,调用不会产生额外的效果。此方法不等待以前提交的任务执行完成,使用一个awaitTermination来做这件事。 2 void shutdown(); 3 4 5 //试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 6 //此方法不等待正在执行的活动线程执行完成,使用一个 awaitTermination 来做这件事。 7 //除了尽最大努力停止处理正在执行的活动任务外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未能响应中断的任务都可能永远不会终止。 8 List<Runnable> shutdownNow(); 9 10 11 //如果此executor已关闭,则返回 true。 12 boolean isShutdown(); 13 14 //如果关闭后所有任务都已完成,则返回 true。注意,除非先了调用shutdown或shutdown now,否则isTerminated永远不会为真。 15 boolean isTerminated(); 16 17 18 //阻塞直到在shutdown请求之后所有任务都执行完成,或者超时,或者当前线程被中断,若发生了多种情况,以最先发生的情况为准。 19 //返回时若该Executor被终止了返回true,如果超时在终止之前结束,则为false 20 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; 21 22 //提交一个有返回值的Callable类型任务,并立即返回一个表示该任务未来结果的Future对象,Future的get方法将在成功完成任务后返回任务的结果。 23 <T> Future<T> submit(Callable<T> task); 24 25 //提交一个有返回值的Runnable类型任务,并立即返回一个表示该任务未来结果的Future对象,Future的get方法将在成功完成任务后返回任务的结果。 26 Future<?> submit(Runnable task); 27 28 //提交一个有返回值的Runnable类型任务,并立即返回一个表示该任务未来结果的Future对象,Future的get方法将在成功完成任务后返回任务的结果。此方法多了一个用于携带返回结果的result参数。 29 <T> Future<T> submit(Runnable task, T result); 30 31 /*********************批量任务执行********************/ 32 //执行给定的任务集合,立即返回一个这些任务完成之后保存这些任务状态和结果的Future列表,对于返回集合中的每一个Future其isDone方法都返回true。注意,这里的任务完成可以是正常结束也可以是异常结束。 33 //不允许在这些任务执行期间修改指定的集合参数。 34 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 35 36 37 //上面方法的超时版本,注意超时返回时,所有未完成的任务将被取消。 38 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; 39 40 //执行给定的任务集合,立即返回一个仅表示一个正常完成(即,没有抛异常)的任务的Future对象,如果有的话。不论是正常返回还是异常返回,在返回时其他未完成的都将被取消。 41 //不允许在这些任务执行期间修改指定的集合参数。 42 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; 43 44 //上面方法的超时版本, 45 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
内存一致性:
向 ExecutorService 提交 Runnable 或 Callable 任务之前的线程中的所有操作 happen-before 由该任务执行的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。
ScheduledExecutorService
一种可以延迟或者周期性的执行给定任务的ExecutorService ,schedule方法创建一个延迟任务并返回一个能够用于取消或检测任务状态的Future对象,它就是ScheduledFuture(见下面Fork/Join类图)。
scheduleAtFixedRate和scheduleWithFixedDelay方法则创建并执行某些在给给定初始延迟时间之后,在取消之前前一直周期性运行的任务,但在任务的周期性执行过程中如果遇到异常,则会停止后续的周期执行。
具体的说,scheduleAtFixedRate顾名思义是一种固定频率的周期性任务,也就是说如果执行将在推迟initialDelay之后开始,然后是initialDelay+周期时长,然后是initialDelay+ 2 *周期时长,以此类推。但如果此任务的某次执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行,也就是说当周期到达时发现上一个周期还没结束,那么本次执行将可能会被延迟而不是并发执行,然后其再后续的执行依然按照initialDelay + n * 周期时长的周期性执行。
scheduleWithFixedDelay则是一种固定延迟的周期性任务,也就是两次任务执行期间始终保持着一个给定的延迟,若前一个周期执行的时间耗时比较久,下一个周期始终会等待上一个周期结束之后再间隔一个固定的延迟才会开始执行,任意两次任务执行之间间隔给定的延迟时长。
用Executor.execute(Runnable)和ExecutorService的submit方法提交的任务看做是0延迟的任务执行,schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。
使用示例,下面创建了一个第一次延迟10秒之后在一个小时之内每10秒钟执行一次(打印deep)的周期性任务:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 import static java.util.concurrent.TimeUnit.*; 2 3 4 class BeeperControl { 5 //创建一个ScheduledExecutorService实例 6 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 7 8 public void beepForAnHour() { 9 10 //具体任务,这里是打印deep 11 final Runnable beeper = new Runnable() { 12 public void run() { System.out.println("beep"); } 13 }; 14 15 //创建一个固定频率的周期任务 16 final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS); 17 18 //在一个小时之后取消该周期任务 19 scheduler.schedule(new Runnable() { 20 public void run() { beeperHandle.cancel(true); } 21 }, 60 * 60, SECONDS); 22 } 23 }
AbstractExecutorService
该类提供了 ExecutorService 的默认实现,它使用了一个名为newTaskFor,返回值为RunnableFuture的方法实现了其他的submit、invokeAny 和 invokeAll 方法,具体来说使用了RunnableFuture的实现类FutureTask封装了原始的Runnable,Callable任务并作为获取异步任务结果的返回对象,但子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现类,如下面的示例:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 2 3 //自定义RunnableFuture的实现类 4 static class CustomTask implements RunnableFuture {...} 5 6 //重写newTaskFor方法 7 protected RunnableFuture newTaskFor(Callable c) { 8 return new CustomTask(c); 9 } 10 //重写newTaskFor方法 11 protected RunnableFuture newTaskFor(Runnable r, V v) { 12 return new CustomTask(r, v); 13 } 14 // ... add constructors, etc. 15 }}
newTaskFor的源码如下,其他submit、invokeAny和invokeAll方法都借助了newTaskFor方法:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public Future<?> submit(Runnable task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<Void> ftask = newTaskFor(task, null); 4 execute(ftask); 5 return ftask; 6 } 7 8 public <T> Future<T> submit(Callable<T> task) { 9 if (task == null) throw new NullPointerException(); 10 RunnableFuture<T> ftask = newTaskFor(task); 11 execute(ftask); 12 return ftask; 13 } 14 15 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 16 throws InterruptedException { 17 if (tasks == null) 18 throw new NullPointerException(); 19 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 20 boolean done = false; 21 try { 22 //循环提交所有任务 23 for (Callable<T> t : tasks) { 24 RunnableFuture<T> f = newTaskFor(t); 25 futures.add(f); 26 execute(f); 27 } 28 //循环等待每一个任务结束 29 for (int i = 0, size = futures.size(); i < size; i++) { 30 Future<T> f = futures.get(i); 31 if (!f.isDone()) { 32 try { 33 f.get(); //若还没结束就阻塞等待,这里忽略了CancellationException,ExecutionException,但是其余异常会抛出去 34 } catch (CancellationException ignore) { 35 } catch (ExecutionException ignore) { 36 } 37 } 38 } 39 done = true; //表示所有任务正常结束 40 return futures; 41 } finally { 42 //若有任务异常结束则取消所有任务 43 if (!done) 44 for (int i = 0, size = futures.size(); i < size; i++) 45 futures.get(i).cancel(true); 46 } 47 } 48
它们都依赖未实现的execute(Runnable command)接口方法去真正执行任务 ,这是留给子类去实现的部分。invokeAll方法中如果有任务出现了异常将取消所有任务并抛出异常。
invokeAny的实现由doInvokeAny实现,它由ExecutorCompletionService去执行提交的任务,但是ExecutorCompletionService的submit方法内部其实还是先后调用了当前ExecutorService实例的newTaskFor方法和execute(Runnable command)方法。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //调用doInvokeAny实现 2 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 3 throws InterruptedException, ExecutionException { 4 try { 5 return doInvokeAny(tasks, false, 0); 6 } catch (TimeoutException cannotHappen) { 7 assert false; 8 return null; 9 } 10 } 11 12 //调用doInvokeAny实现 13 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 14 long timeout, TimeUnit unit) 15 throws InterruptedException, ExecutionException, TimeoutException { 16 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 17 } 18 19 /** 20 * invokeAny的主要机制。 21 */ 22 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 23 boolean timed, long nanos) 24 throws InterruptedException, ExecutionException, TimeoutException { 25 if (tasks == null) 26 throw new NullPointerException(); 27 int ntasks = tasks.size(); 28 if (ntasks == 0) 29 throw new IllegalArgumentException(); 30 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 31 32 //创建了一个ExecutorCompletionService真正用于提交任务 33 ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); 34 35 //为了提高效率,特别是在并行度有限的执行器中,在提交更多任务之前,请检查以前提交的任务是否已经完成。这种交错和异常机制解释了主循环的混乱。 36 37 try { 38 39 ExecutionException ee = null; //记录异常,这样如果我们不能获得任何结果,我们就可以抛出我们得到的最后一个异常。 40 41 final long deadline = timed ? System.nanoTime() + nanos : 0L; 42 Iterator<? extends Callable<T>> it = tasks.iterator(); 43 44 futures.add(ecs.submit(it.next())); 一定要先开始一项任务; 45 --ntasks; 46 int active = 1; 47 48 for (;;) { //自旋 49 Future<T> f = ecs.poll(); //取走已完成的任务的Future,若不存在则为null 50 if (f == null) { //不存在一个已经完成的任务 51 if (ntasks > 0) { 52 --ntasks; 53 futures.add(ecs.submit(it.next())); //若还有任务,再提交一个 54 ++active; 55 } 56 else if (active == 0) 57 break; //没有正在进行的活动任务了,退出循环 58 else if (timed) { 59 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 60 if (f == null) //超时时间到了依然没有一个已完成的任务,抛出超时异常,结束循环 61 throw new TimeoutException(); 62 nanos = deadline - System.nanoTime(); 63 } 64 else 65 //若存在还在进行的任务,并且没有超时,就继续尝试取走已完成的任务的Future,若现在不存在则阻塞等待 66 f = ecs.take(); 67 } 68 if (f != null) { //发现一个已经完成的任务了 69 --active; 70 try { 71 return f.get(); //返回其返回值,若异常记录异常 72 } catch (ExecutionException eex) { 73 ee = eex; 74 } catch (RuntimeException rex) { 75 ee = new ExecutionException(rex); 76 } 77 } 78 } 79 80 81 if (ee == null) //出现了未捕捉的异常,创建一个异常返回 82 ee = new ExecutionException(); 83 throw ee; //抛出超时异常,结束 84 85 } finally { 86 //返回之前,取消其他所有任务 87 for (int i = 0, size = futures.size(); i < size; i++) 88 futures.get(i).cancel(true); 89 } 90 }
doInvokeAny的实现就是每提交一个任务都会看看有没有任务已经结束(可以是异常结束),若没有就继续提交一个任务,直到所有的任务都提交完了若依然没有完成的任务就阻塞等待。若提交过程中(或所有任务提交完成之后)发现一个完成的任务则在返回该任务结果(若是异常结束则抛出异常)之前取消所有其他任务。
ThreadFactory
顾名思义就是用来创建线程的工厂,它也只有一个接口方法Thread newThread(Runnable r),使用该方法来创建线程可以按照统一的规则设定线程的前缀名、优先级、守护进程状态、线程组等属性,也能够消除业务使用者对线程对象的引用。
Future以及Fork/Join框架
下面的类结构图包含Future和ForkJoin框架两部分,因为它们在类层次结构上有关联所以把它们一起画出来,Future并不陌生它用于获取异步任务结果,fork/join框架主要包含ForkJoinTask及其三个抽象子类、ForkJoinWorkerThread、ForkJoinPool几个类,它是为了帮助你更好地利用多处理器带来的好处,为那些能够被递归地拆解成子任务的工作类型量身设计的,其目的在于能够使用所有可用的运算能力来提升你的应用的性能。Java并发包中关于这一部分的类结构图如下(忽略了非并发包以及静态的、私有的、Final的内部类实现):
上图画出了Java并发包中关于Future和Fork/Join框架的类结构图(Note: 绿色表示接口,蓝色表示实例类,黄色表示抽象类),它的接口比较多,其中最主要的是Future和RunnableScheduledFuture接口,另外只有三个实体类,CompletableFuture,FutureTask和ExecutorCompletionService。其中Runnable、Delayed接口这里就不用再做描述了,前面已经在Java同步数据结构之DelayQueue/DelayedWorkQueue中对Delayed讨论过了。
Future
表示异步计算的结果,它提供了检查计算是否完成、等待计算的完成,并获取计算的结果的方法。其get方法用来获取结果,在未完成之前调用该方法将会阻塞直到完成。取消任务的继续执行则用cancel方法。还提供了一些其他方法,以确定任务是正常完成还是被取消了。一旦任务完成,就不能再取消了。如果仅仅是将Future用于可取消的用途,而对任务执行结果不关心的话,可以声明为Future<?>的形式,并且直接返回 null 作为底层任务的执行结果。Future<V>形式中的V就是get方法的返回类型。
以下是一个用法示例:
1 interface ArchiveSearcher { 2 String search(String target); 3 } 4 5 class App { 6 ExecutorService executor = ... 7 ArchiveSearcher searcher = ... 8 void showSearch(final String target) throws InterruptedException { 9 10 //异步执行搜索任务 11 Future future = executor.submit(new Callable() { 12 public String call() { 13 return searcher.search(target); 14 }}); 15 displayOtherThings(); // 搜索时做其他事情 16 try { 17 displayText(future.get()); // 获取搜索结果 18 } catch (ExecutionException ex) { 19 cleanup(); //异常清理工作 20 return; 21 } 22 } 23 }
FutureTask是一个既实现了Runnable又实现了Future的实现类,所以可以提交给Executor执行,所以示例中的任务提交也可以换成这样:
1 //将搜索构造成FutureTask 2 FutureTask<String> future = new FutureTask(new Callable() { 3 public String call() { 4 return searcher.search(target); 5 }}); 6 executor.execute(future); //提交FutureTask执行
内存一致性:异步计算的所有操作 happen-before 另一个线程调用Future.get() 之后的操作。
RunnableFuture
直译过来就是可执行的Future,继承了Runnable和Future两个接口的接口,使其可以直接提交给Executor执行并通过get获取返回结果,例如上例中的FutureTask就是其实现类。该接口只有一个重写了Runnable接口的run()接口方法,实现该就是在任务未被取消时将任务执行的结果设置到Future,以便后面调用Future.get()可以拿到这个任务执行结果。
ScheduledFuture
一种可被取消的延迟产生结果的Future,它继承了Delayed和Future两个接口的接口,它没有继承Runnable接口,故不能作为任务提交而仅表示任务的结果,而且通常它是用于作为通过ScheduledExecutorService安排任务的结果。
RunnableScheduledFuture
就是一种可执行的ScheduledFuture,我们知道ScheduledFuture没有继承Runnable接口,只能用于接收任务执行结果而不能像RunnableFuture那样既可作为任务提交又可以用于获取任务结果,那么RunnableScheduledFuture接口就是为了弥补这样的缺陷而生的,它在继承了ScheduledFuture的同时还继承了RunnableFuture,所以它也成了一种可以作为任务提交的Future。
该接口仅仅只有一个接口方法:boolean isPeriodic(); 该方法表示当前任务如果是一个周期任务则返回true。
ForkJoinTask
ForkJoinTask是通过ForkJoinPool运行的任务的抽象基类,类似一个线程实体,但是比普通线程更加轻量,也是一种轻量级的Future。通常我们不会直接实现ForkJoinTask,而是实现其三个抽象子类,RecursiveAction 用于大多数不返回结果的计算, RecursiveTask 用于返回结果的计算, CountedCompleter 用于那些操作完成之后触发其他操作的操作。
这一部分和其它接口与类后面详细讨论。
FutureTask
作为Future和RunnableFuture的实现类,该类是实现获取异步任务执行结果的基本实现,也是后面要介绍的线程池类都会用到的类,所以在开始线程池学习之前,先将这个类搞清楚吧。它是一种将Runnable,Callable进一步封装成可以直接被线程池执行的Future,以实现可以取消、查询任务是否完成和同步获取异步任务执行结果的功能。它支持仅在任务完成时才能获取结果;如果尚未完成,则get 方法将被阻塞。一旦任务完成,就不能再重新开始或取消。
FutureTask在实现的时候,使用了一个int类型的state跟踪任务的执行状态,一个Object类型的outcome记录任务的执行结果或者导致异常结束的异常信息,FutureTask的任务有七种状态,按大小顺序依次是:NEW, COMPLETING(中间状态), NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTING(中间状态), INTERRUPTED。
FutureTask被实例化之后,其初始状态是NEW,该状态表示还没有开始执行,或者至少还没有完成,COMPLETING是一种中间状态,当任务执行完成之后,在设置其执行结果outcome之前先要将其状态设置成COMPLETING,然后设置outcome,最后才将状态设置成NORMAL或者EXCEPTIONAL。先看其成员属性和一个内部类WaitNode:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //状态字段 2 private volatile int state; 3 4 //七种状态 5 private static final int NEW = 0; 6 private static final int COMPLETING = 1; 7 private static final int NORMAL = 2; 8 private static final int EXCEPTIONAL = 3; 9 private static final int CANCELLED = 4; 10 private static final int INTERRUPTING = 5; 11 private static final int INTERRUPTED = 6; 12 13 14 /** 需要被执行的任务; 开始运行后为null */ 15 private Callable<V> callable; 16 17 /** 通过get方法要返回的结果或者要抛出的异常*/ 18 private Object outcome; 19 20 /** 执行该任务的线程; 在run方法中通过CAS设置*/ 21 private volatile Thread runner; 22 23 /** 等待线程的Treiber堆栈 */ 24 private volatile WaitNode waiters; 25 26 27 //用来记录Treiber堆栈中等待的线程的链表节点。有关更详细的说明,请参阅其他类,如Phaser和SynchronousQueue。 28 static final class WaitNode { 29 volatile Thread thread; 30 volatile WaitNode next; //下一个节点 31 WaitNode() { thread = Thread.currentThread(); } 32 }
这七种状态一般有如下这几种转换:
NEW -> COMPLETING -> NORMAL 初始状态 -> 正在完成 -> 正常结束了
NEW -> COMPLETING -> EXCEPTIONAL 初始状态 -> 正在完成 -> 异常了
NEW -> CANCELLED 初始状态 -> 被取消了
NEW -> INTERRUPTING -> INTERRUPTED 初始状态 -> 正在中断 -> 被中断了
取消和中断状态的转换后面再说,成员属性中除了state,callable是要执行的任务,FutureTask统一将Callable、Runnable的任务用Callable的形式保存。runner表示正在执行该任务的线程。waiters表示由于调用get方法获取任务执行结果而被阻塞的线程被包装之后的链表的头节点,FutureTask内部使用一个很简单的链表的结构将那些因等待执行结果而阻塞的线程保存起来,以便将来任务结束唤醒它们。
构造方法
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //根据Callable构造FutureTask 2 public FutureTask(Callable<V> callable) { 3 if (callable == null) 4 throw new NullPointerException(); 5 this.callable = callable; 6 this.state = NEW; // ensure visibility of callable 7 } 8 9 //根据Runnable构造FutureTask,并在成功完成之后返回给定的结果 10 //如果不关心结果,result可以为null 11 public FutureTask(Runnable runnable, V result) { 12 // //将Runnable包装成Callable 13 this.callable = Executors.callable(runnable, result); 14 this.state = NEW; // ensure visibility of callable 15 }
支持以Callable、Runnable的实现来实例化一个FutureTask,但是其内部会将Runnable封装成Callable的任务。可以看到任务的初始状态是NEW,FutureTask可以构造一个指定任务执行结束之后返回固定结果的实例,FutureTask(Runnable runnable, V result);不论任务执行结果是什么(不包括异常结束,被取消),get方法的返回值都是此处指定的result。
任务的执行块---run
FutureTask是一种会被线程调度执行的任务,而任务的具体执行方法就是其run方法,该方法再执行构造方法传入的Callable或者Runnable的任务。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public void run() { 2 //任务不是NEW状态,或者已经有线程在处理了,返回 3 if (state != NEW || 4 !UNSAFE.compareAndSwapObject(this, runnerOffset, 5 null, Thread.currentThread())) 6 return; 7 8 //是NEW状态,并且没有其他线程在处理,开始干活 9 try { 10 Callable<V> c = callable; 11 if (c != null && state == NEW) { //状态二次确认 12 V result; 13 boolean ran; 14 try { 15 result = c.call(); //执行任务 16 ran = true; //记录任务正常结束 17 } catch (Throwable ex) { //注意这里将任务执行的异常捕获了 18 result = null; 19 ran = false; 20 setException(ex); //设置任务异常结束的异常及状态 21 } 22 if (ran) 23 set(result); //设置任务正常结束的结果及状态 24 } 25 } finally { 26 //必须先设置任务的状态,清除线程引用,防止任务被其他线程调用 27 runner = null; 28 // 必须重读任务状态,避免遗漏了中断信息 29 int s = state; 30 if (s >= INTERRUPTING) //若任务正在或者已经被中断 31 handlePossibleCancellationInterrupt(s); //等待中断完成 32 } 33 } 34 35 protected void setException(Throwable t) { 36 //这里先将任务状态设置成了中间状态COMPLETING 37 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //确保任务没有被取消或中断 38 outcome = t; //设置异常 39 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 更新任务的最终状态为异常结束 40 finishCompletion(); 41 } 42 } 43 44 protected void set(V v) { 45 //这里先将任务状态设置成了中间状态COMPLETING 46 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //确保任务没有被取消或中断 47 outcome = v; //设置正常结果 48 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 更新任务的最终状态正常结束 49 finishCompletion(); 50 } 51 } 52 53 //确保来至cancel(true)的任何中断只在运行run或runAndReset时传递给任务 54 private void handlePossibleCancellationInterrupt(int s) { 55 // It is possible for our interrupter to stall before getting a chance to interrupt us. Let's spin-wait patiently. 56 if (s == INTERRUPTING) 57 while (state == INTERRUPTING) 58 Thread.yield(); // 等待中断完成 59 60 // assert state == INTERRUPTED; 61 62 // We want to clear any interrupt we may have received from cancel(true). However, it is permissible to use interrupts as an independent mechanism for a task to communicate with its caller, and there is no way to clear only the cancellation interrupt. 63 64 // Thread.interrupted(); 65 } 66 67 //移除并唤醒所有的等待线程,执行done,并将任务引用置为null 68 private void finishCompletion() { 69 // assert state > COMPLETING; 70 for (WaitNode q; (q = waiters) != null;) { 71 //将等待链表置为null 72 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 73 for (;;) { //将等待链表中的每一个线程唤醒 74 Thread t = q.thread; 75 if (t != null) { 76 q.thread = null; 77 LockSupport.unpark(t); //唤醒 78 } 79 WaitNode next = q.next; 80 if (next == null) 81 break; 82 q.next = null; // 解除链表的节点引用,帮助垃圾回收 83 q = next; 84 } 85 break; 86 } 87 } 88 89 //任务结束的钩子函数,包括正常结束或被取消或异常结束,只要isDone返回ture。 90 //重写该方法的时候可以同查询状态,知道任务是否是被取消的。 91 done(); 92 93 callable = null; // to reduce footprint 94 }
run方法的逻辑就是,如果任务还没有开始(状态是NEW),当然肯定也没有被取消(中断状态也是由于取消造成的),并且没有被其他线程(通过CAS设置runner线程)执行,则执行构造方法传入的Callable或者Runnable的任务。并将结果写回outcome,如果出现异常就将异常同样记录到outcome。set、setException先将状态设置成COMPLETING,然后设置outcome,最后才将状态设置成NORMAL或者EXCEPTIONAL。然后通过finishCompletion方法唤醒被get阻塞的等待执行结果的线程(当然还要将它们从等待链表中清除它们,帮助GC回收)。run方法是根据任务状态和runner线程是否被设置来避免任务被多个线程同时执行。
run方法中值得注意的是,执行Callable或者Runnable的任务的时候产生的异常将被记录但不会继续抛给调用FutureTask.run的线程。因此那些调度FutureTask的线程无论如何都不会从任务得执行中收到异常,在它们任务都正常结束了。
同步获取任务执行结果---get
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) //若任务还未完成,等待完成 4 s = awaitDone(false, 0L); 5 return report(s); 6 } 7 8 等待任务完成(包括异常结束,被中断,被取消),或者超时,返回任务最终的状态 9 private int awaitDone(boolean timed, long nanos) 10 throws InterruptedException { 11 final long deadline = timed ? System.nanoTime() + nanos : 0L; //超时时间 12 WaitNode q = null; 13 boolean queued = false; 14 for (;;) { //自旋 15 if (Thread.interrupted()) { //若被中断, 16 removeWaiter(q); //从等待链表中移除该节点 17 throw new InterruptedException(); 18 } 19 20 int s = state; 21 if (s > COMPLETING) { //已经完成(包括异常结束,被取消,被中断) 22 if (q != null) 23 q.thread = null; 24 return s; 25 } 26 else if (s == COMPLETING) // 表示任务已经执行完,正在设置执行结果,稍等马上就好 27 Thread.yield(); 28 else if (q == null) 29 q = new WaitNode(); //构造等待节点,准备入队 30 else if (!queued) //还没入队 31 //头插法 入队 32 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 33 q.next = waiters, q); 34 else if (timed) { //是超时等待 35 nanos = deadline - System.nanoTime(); 36 if (nanos <= 0L) { //已经超时, 从等待链表中移除该节点 37 removeWaiter(q); 38 return state; 39 } 40 LockSupport.parkNanos(this, nanos); //超时等待 41 } 42 else 43 LockSupport.park(this); //已经入队,并且不是超时等待,那就是无限期等待 44 } 45 } 46 47 //尝试断开超时或中断的等待节点的链接,以避免垃圾存留 48 private void removeWaiter(WaitNode node) { 49 if (node != null) { 50 node.thread = null; //将线程引用置为null 51 retry: 52 for (;;) { // 产生竞争时重试 53 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 54 s = q.next; 55 if (q.thread != null) 56 pred = q; 57 //线程引用null的就是需要断开的节点 58 else if (pred != null) { 59 pred.next = s; //p的前驱指向p的后继,断开节点p的链接 60 if (pred.thread == null) //是否产生竞争检查 61 continue retry; // 产生竞争时重试 62 } 63 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, 64 q, s)) 65 continue retry; // 产生竞争时重试 66 } 67 break; 68 } 69 } 70 } 71 72 73 //为完成的任务返回结果,或者抛出异常结束的信息 74 private V report(int s) throws ExecutionException { 75 Object x = outcome; 76 if (s == NORMAL) 77 return (V)x; //正常结束,返回执行结果 78 if (s >= CANCELLED) 79 throw new CancellationException(); //被取消,抛出CancellationException 80 throw new ExecutionException((Throwable)x); //异常结束,抛出相关的异常 81 }
get的逻辑就是如果任务还没结束,调用awaitDone等待任务执行结束,将等待的线程使用头插法的方式插入到等待链表的头部,当任务有了结果,被唤醒之后,若是等待线程中断的直接抛出InterruptedException,否则返回任务的当前状态(是正常结束了,还是异常结束了,还是被取消了)。然后由report方法根据当前任务的状态返回结果,或者抛出相应的异常:正常结束返回执行结果,异常结束抛出由当时发生的异常封装的ExecutionException,被取消则抛出CancellationException。
任务的取消---cancel
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //试图取消此任务的执行。如果任务已经完成,或者已经被取消,或者因为一些原因无法被取消次方法都会失败, 2 //如果成功,并且在调用cancel时此任务尚未启动,则永远不应该运行此任务. 3 //如果任务已经启动,那么mayInterruptIfRunning参数决定是否应该中断执行该任务的线程,以尝试停止该任务。 4 //在此方法返回之后,对isDone的后续调用将始终返回true。 5 //如果此方法返回true,则对isCancelled 的后续调用将始终返回true。 6 public boolean cancel(boolean mayInterruptIfRunning) { 7 8 //如果任务已经不是NEW了(表示还没被执行或者执行完成,也没有被取消或中断)则立即返回false 9 if (!(state == NEW && 10 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 11 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 12 return false; 13 14 //说明将任务从NEW状态修改成了INTERRUPTING 或者 CANCELLED 15 try { // in case call to interrupt throws exception 16 if (mayInterruptIfRunning) { 17 try { 18 Thread t = runner; 19 if (t != null) //不为空表示有线程正在执行此任务 20 t.interrupt(); //中断正在执行的任务 21 } finally { // final state 22 //如果中断了任务,将任务状态从INTERRUPTING修改到INTERRUPTED 23 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 24 } 25 } 26 } finally { 27 finishCompletion(); //唤醒所有等待任务结果的线程,并调用钩子函数done() 28 } 29 return true; 30 }
FutureTask通过cancel方法可以取消那些还没被执行完成,甚至中断正在被执行的任务,如果任务已经完成,或者已经被取消了则无论如何也不能被取消。其参数mayInterruptIfRunning为ture的话就算任务已经开始执行,但是还没结束,就可以中断执行该任务的线程,试图阻止它们继续执行,但具体能不能被中断,还得看任务的执行过程中是否有可以响应中断的逻辑,否则根本停不下来。
当mayInterruptIfRunning为false时,若cancel成功,则还没开始执行的任务(至少还没执行完成,因为run方法中执行完任务之后,设置任务结果之前会先将状态设置成COMPLETING这个中间状态)的状态将由NEW 转换到CANCELLED状态。
当mayInterruptIfRunning为true时,若cancel成功,则还没开始执行的任务(至少还没执行完成,因为run方法中执行完任务之后,设置任务结果之前会先将状态设置成COMPLETING这个中间状态)的状态将由NEW先转换到INTERRUPTING ,然后执行完interrupt方法中断线程之后,再转换成INTERRUPTED 。一旦任务的状态处于INTERRUPTING 状态,那么它必将在不久后处于INTERRUPTED 状态,就像若处于COMPLETING的中间状态的话,也必将在不久后处于NORMAL或者EXCEPTIONAL状态。
由此可见,INTERRUPTING, INTERRUPTED这两种状态其实也是由于取消任务造成的,只不过它取消的是已经开始执行,但没有执行完的任务。也因此当状态是CANCELLED, INTERRUPTING, INTERRUPTED时isCancelled 方法都返回true。
isDone和isCancelled
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 isCancelled 如果此任务在正常完成之前被取消,则返回true。 2 public boolean isCancelled() { 3 return state >= CANCELLED; 状态是CANCELLED, INTERRUPTING, INTERRUPTED时 4 } 5 6 如果此任务已完成(包括正常结束、异常结束或被取消),则返回true。 7 public boolean isDone() { 8 return state != NEW; //只要不是初始状态NEW 9 }
如果此任务已完成(包括正常结束、异常结束或被取消),则isDone返回true。一旦cancel方法被调用并返回,不论是true和false,对isDone的后续调用将始终返回true。如果cancel方法返回true,则对isCancelled 的后续调用将始终返回true。只要状态不是NEW,NORMAL或者EXCEPTIONAL,isCancelled 都将返回true,因为其他几种状态(CANCELLED, INTERRUPTING, INTERRUPTED)都是cancel成功的标识。
不设置执行结果的任务执行方法---runAndReset
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 //在不设置执行结果的情况下执行任务,这是为本质上执行多次的任务而设计的。 2 protected boolean runAndReset() { 3 //任务不是NEW状态,或者已经有线程在处理了,返回 4 if (state != NEW || 5 !UNSAFE.compareAndSwapObject(this, runnerOffset, 6 null, Thread.currentThread())) 7 return false; 8 9 //是NEW状态,并且没有其他线程在处理,开始干活 10 boolean ran = false; 11 int s = state; 12 try { 13 Callable<V> c = callable; 14 if (c != null && s == NEW) { //状态二次确认 15 try { 16 c.call(); // 执行任务,不设置任务结果 17 ran = true; 18 } catch (Throwable ex) { 19 setException(ex); //若任务异常结束,依然会设置任务异常结束的异常及状态 20 } 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 s = state; 29 if (s >= INTERRUPTING) //若任务正在或者已经被中断,等待中断完成 30 handlePossibleCancellationInterrupt(s); 31 } 32 return ran && s == NEW; //若任务正常结束,并且任务状态还是初始状态NEW返回true 33 }
该方法就是执行该任务,但是不将任务的执行结果设置到outcome,执行任务的过程中,甚至执行完成之后,该任务的状态依然还是NEW,相当于一种非侵入式的试运行。但是该方法有很多局限性:①必须要该任务还没有被其他线程执行或者取消,②任务的执行不能是异常结束,否则会将异常设置到outcome,并将状态转换成EXCEPTIONAL。该方法主要是被ScheduledThreadPoolExecutor使用。该方法只会在任务被正常执行结束,并且任务状态还是初始状态NEW时返回true。
可以被重写的done方法----实现任务完成的回调
在上面的源码中,只要任务有了结果不论是正常结束,异常结束,还是被取消了(不包括调用runAndReset方法正常结束的任务),都会调用finishCompletion,该方法在唤醒并从等待链表中移除通过调用get方法而阻塞的线程之后,调用了一个done方法,默认该方法什么也不做,子类完全可以重写这个方法实现任务完成之后的事件回调通知机制,从而代替Future默认的通过调用get同步获取任务结果机制。ExecutorCompletionService中实现将任务组的执行结果放入一个已完成的结果队列就是采用的重写done方法。
FutureTask总结
简单来说它主要是为了能够获取异步任务的执行状态与结果而对Runnable, Callable的进一步封装,它提供了可以取消任务执行,和查看任务是否执行结束或者被取消的功能。如果此任务已完成(包括正常结束、异常结束或被取消),则isDone返回true。还没有开始(当然也包括没有被取消)的任务(准确的说是包括压根还没执行,或者已经执行但还没有来得及将任务执行结果回写)才可以被取消,cancel的参数mayInterruptIfRunning为false时,如果任务已经在执行,但是还没执行结束(准确的说是还没有来得及将任务执行结果回写),则仅仅改变任务的状态为CANCELLED即可,而若mayInterruptIfRunning为true,则会尝试中断正在执行此任务的线程,最后任务的状态会是INTERRUPTED,这两种状态都是取消操作造成的。FutureTask使用链表存储那些调用get方法阻塞的线程,当任务有了结果不论是正常结束,异常结束还是被取消,都将唤醒那些线程,并从链表中清除相关存留。
值得注意的是,在FutureTask的run方法中执行Callable或者Runnable的任务的时候产生的异常被捕获并记录了但没有继续抛给调用FutureTask.run的线程。因此那些调度FutureTask的线程无论如何都不会从任务执行中收到异常,在它们看来FutureTask都正常结束了。在很多时候FutureTask都是作为一个基类被扩展成我们自定义的其他Future,因此一定要了解清楚它的原理与内部特性。