线程池
java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。ExecutorService是一个接口,继承了Executor接口。Executor接口只包含了一个方法:void execute(Runnable command);
,该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。jdk默认提供了四种线程池供我们使用。
大体的继承结构如下:
一、线程池的状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高三位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | 是 | 是 | 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 |
SHUTDOWN | 000 | 否 | 是 | 不会接收新任务,但会处理阻塞队列剩余任务(调用shutdown()方法) |
STOP | 001 | 否 | 否 | 会中断正在执行的任务,并抛弃阻塞队列任务(调用shutdownNow()方法) |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因为最高位表示符号位,这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。
状态转换图如下:
二、ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize: 核心线程数目 (最多保留的线程数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会 被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。
-
maximumPoolSize :最大线程数目,一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,才会创建一个新线程(这就是救急线程),然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建救急线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize的数量减去corePoolSize的数量来确定,最多能达到maximunPoolSize即最大线程池线程数量。
-
keepAliveTime :生存时间 - 针对救急线程
-
unit: 时间单位 - 针对救急线程
-
workQueue :新任务被提交后,如果没有空闲的核心线程就会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中常见的任务队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
-
threadFactory :线程工厂 - 可以为线程创建时起个好名字
-
handler: 拒绝策略
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:
-
AbortPolicy策略:抛出 RejectedExecutionException 异常,这是默认策略
-
CallerRunsPolicy策略: 在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
-
DiscardPolicy策略: 放弃本次任务
-
DiscardOldestPolicy策略: 放弃队列中最早的任务,本任务取而代之
-
三、四种线程池
Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
-
public static ExecutorService newFixedThreadPool(int nThreads) : 创建固定数目线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间。
- 阻塞队列是无界的,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务
-
public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
特点:
-
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
-
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
-
救急线程可以无限创建。
-
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线
程。 适合任务数比较密集,但每个任务执行时间较短的情况
-
-
public static ExecutorService newSingleThreadExecutor(): 创建一个单线程化的Executor。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特点:
-
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
-
相较于其它三种线程池,newSingleThreadExecutor()创建的是FinalizableDelegatedExecutorService对象,其应用的是装饰者模式 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService继承自DelegatedExecutorService中的方法都是调用ExecutorService 接口的方法
-
适用于希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
-
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledExecutorService接口的方法
/** *指定延迟时间后执行 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** *scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完* 毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** *scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
四、ExecutorService的常用方法
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务,会阻塞,必须等待所有的任务执行完成后统一返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间,这里的超时时间是针对的所有tasks,而不是单个task的超时时间。同样会阻塞,会堵塞,必须等待所有的任务执行完成后统一返回。但是如果超时,会取消没有执行完的所有任务,并抛出超时异常
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,会阻塞,直到返回结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间,直到返回结果,取消没有执行完的所有任务,并抛出超时异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 不会中断当前运行的线程
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow()
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
//情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;