线程池关系
Executors创建线程池
Executors
实现了几种常用的线程池。
-
newFixedThreadPool
固定线程数的线程池。例子:
ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 5; i++) { executorService.execute(() -> System.out.println(UUID.randomUUID())); }
运行结果:
f953f063-965d-4f3b-bc23-5d49c65a7d94 7de3f854-3415-4b05-8956-08f289db0817 ad595410-bed6-43f9-b3ad-70fa1a17698c cccc009e-b706-4c6c-8d87-1b45bf00c83f 563645ca-d29a-46fe-93c9-ec3164eddc59
-
newSingleThreadExecutor
单线程的线程池。ExecutorService executorService = Executors.newSingleThreadExecutor();
-
newCachedThreadPool
可以缓存线程的线程池。ExecutorService executorService = Executors.newCachedThreadPool();
-
newScheduledThreadPool
可以定时执行任务的线程池。例子:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(() -> System.out.println(UUID.randomUUID()), 1, TimeUnit.SECONDS);
结果:
7ca82ded-2253-4de2-bc37-ac83d9aa4b9f
schedule
方法是在创建任务一段时间后再执行的一次性方法。ScheduledExecutorService
接口中还提供了其他定时、周期执行的方法。
ThreadPoolExecutor
Executors
的各种实现方式本质是用 ThreadPoolExecutor
实现的,理解 ThreadPoolExecutor
才能准确的使用线程池。
参数
构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//忽略...
}
参数名 | 解释 |
---|---|
corePoolSize | 核心线程数,不会销毁,除非设置了 allowCoreThreadTimeOut |
maximumPoolSize | 线程池允许的最大线程数 |
keepAliveTime | 除核心线程以外的多余空闲线程等待新任务的最大时间,超过就会销毁 |
unit | keepAliveTime 的时间单位 |
workQueue | 存放未执行任务的队列,它只保存由 execute 方法提交的任务 |
threadFactory | 生产线程的工厂 |
handler | 被阻塞,也就是线程和队列都已满时的处理策略 |
核心线程数
-
corePoolSize
可以为零,但不能为负。默认情况下,核心线程也是在任务到达时才创建。 -
以核心线程数为界限,大于核心线程数执行器会优先把任务添加到队列;小于核心线程数执行器会优先创建线程执行任务。
-
线程数实际最大值为
(2^29)-1
,而不是Integer.MAX_VALUE
的(2^31)-1
。因为线程池中的实际线程数是和线程池运行状态共同存储在类型为AtomicInteger
名为ctl
的值中(它保存了线程池的状态),其中线程池运行状态有 5 种,因此二进制下需要最少 3 位才能保存下这 5 种状态,结果就是下面这样。
工作队列
ThreadPoolExecutor
源码中推荐使用 SynchronousQueue
、LinkedBlockingQueue
和 ArrayBlockingQueue
。Blocking
表示是阻塞的,Queue
表示是单向的。ps:Deque
表示是双向的。
SynchronousQueue
不持有队列。直接将任务分配给线程,如果没有空闲线程就创建线程,达到maximumPoolSize
时就会拒绝任务,所以需要设置maximumPoolSize
为很大的值。反过来,当处理任务的时间比新任务到达的时间慢就会导致线程数无限增加。它可以防止任务之前相互依赖的问题。LinkedBlockingQueue
无界队列。当核心线程已满且都在工作时新任务会被添加到队列中,因为是无界的,所以maximumPoolSize
参数没有意义。与SynchronousQueue
相同,如果处理任务的时间比新任务到达的时间慢就会导致队列无限增长。它适合相互之间独立的任务且平滑短暂的请求。ArrayBlockingQueue
有界队列。选择小队列大线程池可能遇到系统资源使用增加和上下文切换导致吞吐量减少的问题;选择大队列小线程池可能遇到任务被阻塞(I/O)导致线程调度时间增加的问题。它防止了使用有限的maximumPoolSize
时系统资源被耗尽的问题,比较均衡,所以更难调优。
拒接策略
新任务既不能添加到队列中,也不能创建线程去执行,就会使用设定好的拒绝策略处理它。ThreadPoolExecutor
提供了一下四种。
AbortPolicy
中止策略。拒绝时抛出RejectedExecutionException
异常。这是默认的拒绝策略。CallerRunsPolicy
运行方调用策略。调用提交者本身的线程执行该任务。DiscardPolicy
丢弃策略。无法执行的任务将直接丢弃。DiscardOldestPolicy
丢弃老任务策略。抛弃最新进入队列的任务,再添加新任务到队列中,如果失败则一直重试。
运行状态
状态名 | 说明 | 值 |
---|---|---|
RUNNING | 接收新任务并处理队列中的任务。 | -1 |
SHUTDOWN | 不接受新任务,但处理排队中的任务。 | 0 |
STOP | 不接受新任务,不处理队列中的任务,终止正在运行的任务。 | 1 |
TIDYING | 所有任务都已终止,线程池中没有线程 | 2 |
TERMINATED | terminated()已运行完 | 3 |
提交任务
提交任务有两种方式,一个是 execute
,有返回值,另一个是 submit
,没有返回值。
execute
源码如下:
public void execute(Runnable command) {
if (command == null)
//任务为空返回空指针
throw new NullPointerException();
//获取线程池状态
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//当前线程数小于核心线程数,创建新线程去执行这个任务
if (addWorker(command, true))
//成功了就结束方法
return;
//没成功,更新状态
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//线程池运行状态是 RUNNING,添加任务到队列中,再次获取线程池状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//线程池的运行状态不是 RUNNING,移除该任务,使用拒绝策略拒绝任务,结束运行
reject(command);
else if (workerCountOf(recheck) == 0)
//没线程了,创建一个线程去执行队列中的任务
addWorker(null, false);
}
else if (!addWorker(command, false))
//线程池运行状态不是 RUNNING 或者既不能添加新线程,也不能添加到队列中,所以执行拒绝策略
reject(command);
}
其中 addWorker
方法通过检查线程池的运行状态和边界(corePoolSize
或 maximumPoolSize
)来添加新线程的。它有 firstTask
和 core
两个参数,类型分别为 Runnanle
和 boolean
。firstTask
是创建的新线程立即执行的任务,为空时不执行。core
为 true
时边界为 corePoolSize
,为 false
时边界为 maximumPoolSize
。
submit
是由AbstractExecutorService
提供的,它重载了 3 个方法,如下:
public Future<?> submit(Runnable task) {//...}
public <T> Future<T> submit(Runnable task, T result) {//...}
public <T> Future<T> submit(Callable<T> task) {//...}
大致就是把任务封装下,调用前面的 execute
方法执行,最后返回 Future
。
提交任务的流程如下图:
注意:进入第三个判断,也就是线程数小于最大数,创建的新线程是不立即执行任务。
执行任务
线程池中的线程在 ThreadPoolExecutor
中就是 Worker
,Worker
实现了 Runnable
接口的 run
方法,它在 run
方法中调用了 runWorker
方法执行任务,通过 getTask
方法获取阻塞或定时任务。