简介
在Web开发中,如果要密集处理多个任务时,相对于每次都一个创建线程去执行任务,新建线程来执行任务相对来说是个更好的选择,体现在以下三点:
- 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。 当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
下面从最常用的线程池ThreadPoolExecutor的源码分析如何实现线程池。
继承结构
Executor是最基础的执行接口,只提供了一个execute(Runnable command)提交任务方法;ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;TheadPoolExecutor继承了AbstractExecutorService,是线程池的具体实现。
实现分析
ThreadPoolExecutor类属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池的控制状态(用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位))
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大工作线程数量(2^29 - 1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 阻塞队列,存放提交给线程池的任务
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池容量
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器
private volatile RejectedExecutionHandler handler;
// 线程等待运行时间
private volatile long keepAliveTime;
// 是否运行核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
// 默认拒绝执行处理器
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
线程池状态
线程池本身有两个很重要的状态信息:线程池的运行状态和工作线程数,这两个状态信息都包含在变量ctl(int型,32位)中:ctl的高3位表示线程状态runState,低29位表示工作线程worker的数量workCount。线程状态信息如下:
- RUNNING:-1<<COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,会处理在阻塞队列中等待处理的任务
- SHUTDOWN:0<<COUNT_BITS,即高3位为000,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务
- STOP:1<<COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务
- TIDYING:2<<COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法
- TERMINATED:3<<COUNT_BITS,即高3位为011,低29位为0,terminated()方法调用完成后变成此状态
构造方法
核心参数含义如下:
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量,可能大于corePoolSize,也可能等于
- workQueue: 必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能
- keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime。
- threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
- handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute(Runnable command)
execute方法是向线程池提交任务的,此时线程池的状态为RUNNING(其他状态不接收新提交的任务),主要判断:
- 如果运行的线程少于 corePoolSize,则创建新的工作线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize,且阻塞队列未满,将任务加入阻塞队列workQueue;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
- 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来拒绝任务提交;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl记录线程池状态信息和线程池线程数
int c = ctl.get();
//比较当前线程数是否小于corePoolSize,如果小于则新建一个线程放入线程池中
if (workerCountOf(c) < corePoolSize) {
//成功加入则返回
if (addWorker(command, true))
return;
//加入失败,重新获取ctl
c = ctl.get();
}
//如果当前线程数大于等于corePoolSize,判断线程池是否仍在运行,是的话加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池是否仍在运行
if (! isRunning(recheck) && remove(command))
reject(command);
/** 线程池在运行但是工作线程数为0,此时可能阻塞队列有任务但线程池没有工作线程池,
* 如果配置了参数allowCoreThreadTimeOut(默认是false)为true可能因为核心线程执行
* 完任务且阻塞队列也没有线程等待获取任务,此时属于空闲线程,由于超时会回收核心线程
**/
else if (workerCountOf(recheck) == 0)
/** 传false将会在addWorker方法中判断线程池的工作线程数量和最大线程数量做比较
* 传一个空的任务,开启一个工作线程,但这个工作线程会发现当前的任务是空,然后会去队列中取任务
* 这样就避免了线程池的状态是running,而且队列中还有任务,但线程池却不执行队列中的任务
**/
addWorker(null, false);
}
/**
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为
* maximumPoolSize;如果失败则拒绝该任务
**/
else if (!addWorker(command, false))
reject(command);
}
addWorker(Runnable firstTask, boolean core)
addWorker方法用与创建工作线程,firstTask表示第一个任务,core为true那么线程数受corePoolSize制约,为false则受maximumPoolSize制约。执行流程:
- 检查线程池状态决定是否新建工作线程
- 新建Worker对象并加入到集合中
- 启动工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//运行状态
int rs = runStateOf(c);
/**
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务
* 满足rs >= SHUTDOWN条件后接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保
* 存的任务 2. firsTask为空 3. 阻塞队列不为空
**/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//当前工作线程数
int wc = workerCountOf(c);
//检查线程数量是否超出
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试CAS增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败,重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
}
}
//CAS增加workCount成功,退出循环进入到这里
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//将工作线程work加入到HashSet对象workers
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
内部类Worker
线程池的工作线程是通过包装成Worker对象,Worker类本身既实现了Runnable接口,又继承了同步器AQS,实现了一个简易的不可重入的互斥锁,通过同步状态state控制中断:
- 初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断,不允许中断体现在:
- shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,无法interrupt()
- shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
- 为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
//工作线程
final Thread thread;
//新建Worker传入的任务command,可能为null
Runnable firstTask;
//执行完的任务数量
volatile long completedTasks;
//同步状态state为0代表为锁定,state为1代表锁定,state为-1代表初始状态
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建线程
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
runWorker(Worker w)
runWork是工作线程执行任务的方法,执行过程如下:
- 通过while循环步断获取任务
- 检查线程池运行状态,如果处于STOP及以上,中断线程;如果是RUNNING或SHUTDOWN,不中断工作线程
- task.run()执行任务
- 当取得的任务task为null退出循环,执行processWorkerExit方法,此时Work的工作线程run()方法执行完毕,线程销毁
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//同步状态state设置为0,允许中断
w.unlock(); // allow interrupts
//用于标识是否工作线程由于异常突然终止,在执行任务抛出异常或线程被中断两种情况为true
boolean completedAbruptly = true;
try {
//循环取任务执行
while (task != null || (task = getTask()) != null) {
//上锁,表示正在工作线程正在执行任务,不能响应中断
w.lock();
/**
* 确保在线程池状态在STOP及以上时,才会被设置中断标示,否则清除中断标示,判断以下两个条件:
* 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
* 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又
* 清除了中断标示,再次判断线程池状态是否>=stop(可能调用了shutdownNow关闭线程池)
**/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()
当工作线程数达到corePoolSize,后续提交的任务就会放到阻塞队列workQueue中,工作线程通过getTask方法从阻塞队列取出任务,执行以下步骤:
- 检查线程池状态及阻塞队列是否为空
- 控制核心线程数(使工作线程数不超过corePoolSize)
- 从阻塞队列取任务
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 1.rs>SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务,不管workQueue是否为空都返回null
* 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非workQueue为空
* 如果以上条件满足,则将workerCount减1并返回null。因为如果当前线程池状态的值是SHUTDOWN
* 或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* timed表示工作线程是否需要剔除,为true
* allowCoreThreadTimeOut默认为false,表示核心线程不做超时控制
* wc > corePoolSize 超过核心线程数
* timed为true下面的if条件通过返回null,从而剔除掉超过corePoolSize数目的线程,使线程数
* 回复corePoolSize
**/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 条件1:
* wc > maximumPoolSize 检查是否超出maximumPoolSize,线程池可能重置了maximumPoolSize
* timed && timedOut 当前线程需要超时控制且上次取任务超时为true
* 条件2:如果线程数量大于1,或者阻塞队列是空的
* 两个条件都为true把workCount减一,返回null
**/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
//CAS失败重新循环
continue;
}
try {
/**
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在
* keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果r==null,说明是超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit(Worker w, boolean completedAbruptly)
当getTask返回null,会跳出runWork的while循环,此时工作线程的run方法执行完毕,线程会终止,同时会执行processWorkerExit方法,步骤如下:
- 根据completedAbruptly参数调整线程池的工作线程数
- 统计完成的任务数并从集合中移出Worker对象
- 根据线程池状态进行判断是否结束线程池
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是突然终止,重新调整workCount
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
//从集合中移出Worker对象
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
//线程状态小于STOP,即线程池处于RUNNING或SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
//检查是否异常终止
if (!completedAbruptly) {
//如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
//如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//突然终止,添加一个Worker
addWorker(null, false);
}
}
shutdown()
关闭线程池,线程池状态由RUNNING变为SHUTDOWN,只处理已有任务不再接收新提交的任务,中断空闲线程。
为什么要中断空闲线程:当线程池状态为RUNNING但是阻塞队列为空,allowCoreThreadTimeOut为默认值false(既不支持核心线程超时回收),那么工作线程必然堵塞在workQueue.take()方法上,而调用了shutdown()方法后线程池状态变为SHUTDOWN不接收新提交的任务,那么阻塞队列永远为空,所以需要通过中断让线程由阻塞状态返回null。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池权限
checkShutdownAccess();
//把线程池运行状态切换为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
interruptIdleWorkers()
中断空闲线程。
private void interruptIdleWorkers() {
//false表明中断所有空闲线程
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// t.isInterrupted()检查线程是否已经中断过
// w.tryLock() runWork在执行任务会上锁,执行完解锁去阻塞队列获得任务,如果tryLock成功
//说明没有执行任务,是空闲线程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
tryTerminate()
根据线程池状态尝试关闭线程池。这里解释一下interruptIdleWorkers(ONLY_ONE):
当到达workerCountOf(c) != 0这个判断时,说明线程池处于SHUTDOWN状态,且阻塞队列已经为空,这是若判断成立,那么还有工作线程等待在线程池上,会中断一个空闲线程,这个被中断的空闲线程的Worker返回null又会调用tryTerminate,从而把线程池关闭的消息传给每个线程,回收空闲线程。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//工作线程数不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个空闲线程(等待在阻塞队列上获取任务的线程)
//中断的线程在回收Worker时还会调用tryTerminate方法,从而回收空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//到这里说明工作线程数workCount为0,线程池状态置为TIDYING
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow()
关闭线程池,运行状态修改为 STOP, 中断所有线程; 并返回未处理的任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//更改线程池状态
advanceRunState(STOP);
// 中断所有工作线程,无论是否空闲
interruptWorkers();
//取出阻塞队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
interruptWorkers()
不论线程是否空闲,中断所有线程。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
/**
* getState() >= 0 同步状态state=-1线程还没启动,大于等于0说明线程以及启动,处于
* 执行任务或空闲状态。
* (t = thread) != null 线程不为null
* !t.isInterrupted() 检查线程是否被中断过。
**/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
总结
本文分析了线程池ThreadPoolExecutor的实现,主要从向线程池提交任务和关闭线程池这两个方法分析的,了解了线程池复用线程资源减少线程创建和切换的开销背后的秘密。