1.Excutor
源码非常简单,只有一个execute(Runnable command)回调接口
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
执行已提交的 Runnable
任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
2.ExcutorService接口
* @since 1.5 * @author Doug Lea */ public interface ExecutorService extends Executor { /** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold {@link * java.lang.RuntimePermission}<tt>("modifyThread")</tt>, * or the security manager's <tt>checkAccess</tt> method * denies access. */ void shutdown(); /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks that were * awaiting execution. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. For example, typical * implementations will cancel via {@link Thread#interrupt}, so any * task that fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold {@link * java.lang.RuntimePermission}<tt>("modifyThread")</tt>, * or the security manager's <tt>checkAccess</tt> method * denies access. */ List<Runnable> shutdownNow(); /** * Returns <tt>true</tt> if this executor has been shut down. * * @return <tt>true</tt> if this executor has been shut down */ boolean isShutdown(); /** * Returns <tt>true</tt> if all tasks have completed following shut down. * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first. * * @return <tt>true</tt> if all tasks have completed following shut down */ boolean isTerminated(); /** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return <tt>true</tt> if this executor terminated and * <tt>false</tt> if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * Submits a value-returning task for execution and returns a * Future representing the pending results of the task. The * Future's <tt>get</tt> method will return the task's result upon * successful completion. * * <p> * If you would like to immediately block waiting * for a task, you can use constructions of the form * <tt>result = exec.submit(aCallable).get();</tt> * * <p> Note: The {@link Executors} class includes a set of methods * that can convert some other common closure-like objects, * for example, {@link java.security.PrivilegedAction} to * {@link Callable} form so they can be submitted. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's <tt>get</tt> method will * return the given result upon successful completion. * * @param task the task to submit * @param result the result to return * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ <T> Future<T> submit(Runnable task, T result); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's <tt>get</tt> method will * return <tt>null</tt> upon <em>successful</em> completion. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<?> submit(Runnable task); /** * Executes the given tasks, returning a list of Futures holding * their status and results when all complete. * {@link Future#isDone} is <tt>true</tt> for each * element of the returned list. * Note that a <em>completed</em> task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @return A list of Futures representing the tasks, in the same * sequential order as produced by the iterator for the * given task list, each of which has completed. * @throws InterruptedException if interrupted while waiting, in * which case unfinished tasks are cancelled. * @throws NullPointerException if tasks or any of its elements are <tt>null</tt> * @throws RejectedExecutionException if any task cannot be * scheduled for execution */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * Executes the given tasks, returning a list of Futures holding * their status and results * when all complete or the timeout expires, whichever happens first. * {@link Future#isDone} is <tt>true</tt> for each * element of the returned list. * Upon return, tasks that have not completed are cancelled. * Note that a <em>completed</em> task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return a list of Futures representing the tasks, in the same * sequential order as produced by the iterator for the * given task list. If the operation did not time out, * each task will have completed. If it did time out, some * of these tasks will not have completed. * @throws InterruptedException if interrupted while waiting, in * which case unfinished tasks are cancelled * @throws NullPointerException if tasks, any of its elements, or * unit are <tt>null</tt> * @throws RejectedExecutionException if any task cannot be scheduled * for execution */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * Executes the given tasks, returning the result * of one that has completed successfully (i.e., without throwing * an exception), if any do. Upon normal or exceptional return, * tasks that have not completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @return the result returned by one of the tasks * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks or any of its elements * are <tt>null</tt> * @throws IllegalArgumentException if tasks is empty * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled * for execution */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * Executes the given tasks, returning the result * of one that has completed successfully (i.e., without throwing * an exception), if any do before the given timeout elapses. * Upon normal or exceptional return, tasks that have not * completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. * * @param tasks the collection of tasks * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result returned by one of the tasks. * @throws InterruptedException if interrupted while waiting * @throws NullPointerException if tasks, any of its elements, or * unit are <tt>null</tt> * @throws TimeoutException if the given timeout elapses before * any task successfully completes * @throws ExecutionException if no task successfully completes * @throws RejectedExecutionException if tasks cannot be scheduled * for execution */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。 shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。 通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法 Executor.execute(java.lang.Runnable)。 方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个, 或全部任务完成(可使用 ExecutorCompletionService类来编写这些方法的自定义变体)。 Executors类为创建ExecutorService提供了便捷的工厂方法。 注意1:它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。 关于ThreadPoolExecutor的更多内容请参考《ThreadPoolExecutor》 关于ScheduledThreadPoolExecutor的更多内容请参考《ScheduledThreadPoolExecutor》 用法示例 下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后等60秒后,任务还没执行完成,就调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。
主要函数:
void shutdown()
启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。如果已经关闭,则调用没有其他作用。
抛出:
SecurityException - 如果安全管理器存在并且关闭,此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
List<Runnable> shutdownNow()
试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。
返回:
从未开始执行的任务的列表
抛出:
SecurityException - 如果安全管理器存在并且关闭,
此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有保持 RuntimePermission("modifyThread")),
或者安全管理器的 checkAccess 方法拒绝访问。
注意1: 它会返回等待执行的任务列表。
注意2: 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消,
所以任何任务无法响应中断都可能永远无法终止。
boolean isShutdown()
如果此执行程序已关闭,则返回 true。
返回:
如果此执行程序已关闭,则返回 true
boolean isTerminated()
如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
返回:
如果关闭后所有任务都已完成,则返回 true
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException
等待(阻塞)直到关闭或最长等待时间或发生中断
参数:
timeout - 最长等待时间
unit - timeout 参数的时间单位
返回:
如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false
抛出:
InterruptedException - 如果等待时发生中断
注意1:如果此执行程序终止(关闭),则返回 true;如果终止前超时期满,则返回 false
<T> Future<T> submit(Callable<T> task)
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。
注:Executors 类包括了一组方法,可以转换某些其他常见的类似于闭包的对象,
例如,将 PrivilegedAction 转换为 Callable 形式,这样就可以提交它们了。
参数:
task - 要提交的任务
返回:
表示任务等待完成的 Future
抛出:
RejectedExecutionException - 如果任务无法安排执行
NullPointerException - 如果该任务为 null
注意:关于submit的使用和Callable可以参阅《使用Callable返回结果》
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
参数:
task - 要提交的任务
result - 返回的结果
返回:
表示任务等待完成的 Future
抛出:
RejectedExecutionException - 如果任务无法安排执行
NullPointerException - 如果该任务为 null
注意:关于submit的使用可以参阅《Callable》
Future<?> submit(Runnable task)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。
参数:
task - 要提交的任务
返回:
表示任务等待完成的 Future
抛出:
RejectedExecutionException - 如果任务无法安排执行
NullPointerException - 如果该任务为 null
注意:关于submit的使用可以参阅《使用Callable返回结果》
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
注意,可以正常地或通过抛出异常来终止已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。
参数:
tasks - 任务 collection
返回:
表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同,每个任务都已完成。
抛出:
InterruptedException - 如果等待时发生中断,在这种情况下取消尚未完成的任务。
NullPointerException - 如果任务或其任意元素为 null
RejectedExecutionException - 如果所有任务都无法安排执行
注意1:该方法会一直阻塞直到所有任务完成。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException
执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。一旦返回后,即取消尚未完成的任务。注意,可以正常地或通过抛出异常来终止已完成 任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
参数:
tasks - 任务 collection
timeout - 最长等待时间
unit - timeout 参数的时间单位
返回:
表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同。
如果操作未超时,则已完成所有任务。如果确实超时了,则某些任务尚未完成。
抛出:
InterruptedException - 如果等待时发生中断,在这种情况下取消尚未完成的任务
NullPointerException - 如果任务或其任意元素或 unit 为 null
RejectedExecutionException - 如果所有任务都无法安排执行
注意1:该方法会一直阻塞直到所有任务完成或超时。
注意2:如果确实超时了,则某些任务尚未完成。【那么这些尚未完成的任务应该被系统取消】。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException,
ExecutionException
执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
参数:
tasks - 任务 collection
返回:
某个任务返回的结果
抛出:
InterruptedException - 如果等待时发生中断
NullPointerException - 如果任务或其任意元素为 null
IllegalArgumentException - 如果任务为空
ExecutionException - 如果没有任务成功完成
RejectedExecutionException - 如果任务无法安排执行
注意1:该方法会一直阻塞直到有一个任务完成。
注意2:一旦正常或异常返回后,则取消尚未完成的任务
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException,
ExecutionException,
TimeoutException
执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
参数:
tasks - 任务 collection
timeout - 最长等待时间
unit - timeout 参数的时间单位
返回:
某个任务返回的结果
抛出:
InterruptedException - 如果等待时发生中断
NullPointerException - 如果任务或其任意元素或 unit 为 null
TimeoutException - 如果在所有任务成功完成之前给定的超时期满
ExecutionException - 如果没有任务成功完成
RejectedExecutionException - 如果任务无法安排执行
注意1:该方法会一直阻塞直到有一个任务完成。
注意2:一旦正常或异常返回后,则取消尚未完成的任务
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
但是,强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),
它们均为大多数使用场景预定义了设置。否则,在手动配置和调整此类时,使用以下指导:
核心和最大池大小
ThreadPoolExecutor将根据corePoolSize(参见 getCorePoolSize())和 maximumPoolSize(参见 getMaximumPoolSize())
设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize, 则创建新线程来处理请求,即使有线程是空闲的。
如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
注意1:在新任务被提交时,如果运行的core线程少于corePoolSize,才创建新core线程。并不是一开始就创建corePoolSize个core线程。
注意2:"如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程"
按需构造
核心线程最初只是在新任务到达时才被ThreadPoolExecutor创建和启动的,
但是也可以手动调用方法 prestartCoreThread() 或 prestartAllCoreThreads()来的提前启动核心线程。
如果构造带有非空队列的池,这时则可能希望预先启动线程。
注意1:核心线程即core线程,只有当前线程数小于等于corePoolSize时,这时的线程才叫核心线程。
创建新线程
使用ThreadFactory创建新线程。如果没有另外说明,则使用 Executors.defaultThreadFactory() 创建线程,他们在同一个ThreadGroup中
并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。
如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
注意1:可以指定创建线程的ThreadFactory,默认的是使用Executors.defaultThreadFactory()来创建线程,所有的线程都在一个ThreadGroup中。
保持活动时间
如果池中当前有多于corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止
(参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。
如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。
如果把值设为Long.MAX_VALUE TimeUnit.NANOSECONDS 的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。
默认情况下,保持活动策略只在有多于corePoolSizeThreads 的线程时应用。
但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。
注意1:setKeepAliveTime(long, java.util.concurrent.TimeUnit)用于设置空闲线程最长的活动时间,
即如果空闲时间超过设定值,就停掉该线程,对该线程进行回收。
该策略默认只对非内核线程有用(即当前线程数大于corePoolSize),
可以调用allowCoreThreadTimeOut(boolean)方法将此超时策略扩大到核心线程
注意2:如果把值设为Long.MAX_VALUE TimeUnit.NANOSECONDS的话,空闲线程不会被回收直到ThreadPoolExecutor为Terminate。
排队
所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
* 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
* 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
* 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
排队有三种通用策略:
1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。
在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。
此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
直接提交通常要求无界maximumPoolSizes以避免拒绝新提交的任务。
当命令以超过队列所能处理的平均数连续到达时,此策略允许线程无界的增长。
注意1:此策略允许线程无界的增长。
2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)
当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。
这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许队列无限的增长。
注意1:此策略允许队列无限的增长。
3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。
队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
被拒绝的任务
当 Executor 已经关闭,或Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,
在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。
在以上两种情况下,execute 方法都将调用其
RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。
下面提供了四种预定义的处理程序策略:
1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时RejectedExecutionException。
2. 在 ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的 execute 本身。
此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
3. 在ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。
4. 在ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,
则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的RejectedExecutionHandler类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时
注意1:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy都是rejectedExecution的一种实现。
当然也可以自己定义个rejectedExecution实现。
钩子 (hook) 方法
此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable)
和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。
它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。
此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
如果钩子 (hook) 或回调方法抛出异常,则ThreadPoolExecutor的所有线程将依次失败并突然终止。
队列维护
方法 getQueue() 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。
remove(java.lang.Runnable) 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。
注意1:如果任务取消,ThreadPoolExecutor应该自己是可以进行存储回收的。
取消的任务不会再次执行,但是它们可能在工作队列中累积,直到worker线程主动将其移除
外部使用remove(java.lang.Runnable)和purge()可以把它们立即从队列中移除。
终止
如果ThreadPoolExecutor在程序中没有任何引用且没有任何活动线程,它也不会自动 shutdown。
如果希望确保回收线程(即使用户忘记调用 shutdown()),则必须安排未使用的线程最终终止:
设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
扩展示例。此类的大多数扩展可以重写一个或多个受保护的钩子 (hook) 方法。例如,下面是一个添加了简单的暂停/恢复功能的子类:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...);
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
关于它的使用请参考《ExecutorService》
Nested Classes | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
class | ThreadPoolExecutor.AbortPolicy | A handler for rejected tasks that throws a RejectedExecutionException . |
|||||||||
class | ThreadPoolExecutor.CallerRunsPolicy | A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded. |
|||||||||
class | ThreadPoolExecutor.DiscardOldestPolicy | A handler for rejected tasks that discards the oldest unhandled request and then retries execute , unless the executor is shut down, in which case the task is discarded. |
|||||||||
class | ThreadPoolExecutor.DiscardPolicy | A handler for rejected tasks that silently discards the rejected task. |
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
使用 Executors 工厂方法之一比使用此通用构造方法方便得多。
参数:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
抛出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,
或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue 为 null
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
参数:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
抛出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue 或 threadFactory 为 null。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
参数:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
抛出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,
或者 corePoolSize 大于 maximumPoolSize。
public void execute(Runnable command)
在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。
参数:
command - 要执行的任务。
抛出:
RejectedExecutionException - 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException
NullPointerException - 如果命令为 null
public void shutdown()
按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。
抛出:
SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
public List<Runnable> shutdownNow()
尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。
并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。
返回:
从未开始执行的任务的列表。
抛出:
SecurityException - 如果安全管理器存在并且关闭此 ExecutorService
可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),
或者安全管理器的 checkAccess 方法拒绝访问。
public int prestartAllCoreThreads()
启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
返回:
已启动的线程数
public boolean allowsCoreThreadTimeOut()
如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。
返回:
如果允许核心线程超时,则返回 true;否则返回 false
public void allowCoreThreadTimeOut(boolean value)
如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
参数:
value - 如果应该超时,则为 true;否则为 false
抛出:
IllegalArgumentException - 如果 value 为 true 并且当前保持活动时间不大于 0。
public boolean remove(Runnable task)
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则让其不再运行。
此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。
例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge() 方法可用于移除那些已被取消的 Future。
参数:
task - 要移除的任务
返回:
如果已经移除任务,则返回 true
public void purge()
尝试从工作队列移除所有已取消的 Future 任务。此方法可用作存储回收操作,它对功能没有任何影响。
取消的任务不会再次执行,但是它们可能在工作队列中累积,直到worker线程主动将其移除。
调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
abstract <T> Future<T> | submit(Runnable task, T result)
Submits a Runnable task for execution and returns a Future representing that task.
如果执行成功就返回T
result
|
abstract <T> Future<T> | submit(Callable<T> task)
Submits a value-returning task for execution and returns a Future representing the pending results of the task.
|
abstract Future<?> | submit(Runnable task)
Submits
a Runnable task for execution and returns a Future representing that task.
|
http://blog.csdn.net/linghu_java/article/details/17123057