Executor框架
在实际开发工作中,或多或少我们都用到过多线程。一般很少使用 new Thread(){run(){}}.start()这种创建线程。一方面影响代码规范性,另外一方面,也是不方便多线程管理。当大量任务时,线程的创建销毁都会占用很多计算资源。这也是为什么使用线程池的原因。
-
在日常工作中,我们一般使用线程池进行管理线程。
-
后面我们接触到的Fork/Join的线程管理ForkJoinPool也是基于Executor进行实现的。
综合以上,在学习更多多线程相关之前,有必要了解Executor框架的相关知识;
框架的使用示意图
- 主线程通过创建线程任务,在Executor框架容器执行,根据需要返回执行结果。
![](https://img2020.cnblogs.com/blog/1765333/202009/1765333-20200914115523891-742985367.png)
Executor框架构成
-
任务:要执行的任务。线程任务需要实现:Runnable或Callable接口。(这一块的内容已经了解过了,不需要深入分析)
-
任务执行:主要是Executor的继承几口ExecutorService的实现类完成(主要需要分析的内容)
-
异步实现:FutureTask类(前面了解过了FutureTask类的相关知识,不需要再深入)
源码分析:
任务执行相关已经在之前了解过,不做更多深入。主要源码分析的是任务执行的抽象类,和异步实现类。
![](https://img2020.cnblogs.com/blog/1765333/202009/1765333-20200914114701106-1934394229.png)
ExecutorService的实现类(AbstractExecutorService)
submit方法
- 方法提交一个线程任务,并返回一个任务的执行结果Future;
/**
* 提交任务
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);//生成一个新的任务
execute(ftask);//执行新的任务,这个由子类实现
return ftask;//返回执行结果
}
//创建一个新的任务
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
子类需要实现的方法
-
shutdown 方法:在此之前的任务都会执行,但是不会接收新的任务;
-
shutdownNow 方法:暂停所有激活的方法,并返回任务列表;
-
awaitTermination方法:出现终端,暂停,或者限时。阻塞等待现有任务完成
-
execute方法,任务的执行过程;
ThreadPoolExecutor
线程池执行过程:
-
接收到新的线程,如果小于核心线程数,则创建核心线程
-
当超过核心线程数,多的线程任务存放到队列中
-
只有当队列中的存满后,再判断是否小于最大线程数,如果小于最大线程数才创建新的线程
-
当线程任务超过最大线程数,则根据策略进行抛出异常,或者任务舍弃等操作
注意点:
-
核心线程,初始是在有新任务的时候创建;如果初始化有非空队列,核心线程需要提前启动,调用prestartAllCoreThreads方法;
-
ThreadFactory用于创建新的线程,也可以自定义设置线程名称等;
-
超过keep-alive的非核心线程会被回收;
源码分析
任务管理
1.状态管理
![](https://img2020.cnblogs.com/blog/1765333/202009/1765333-20200916141932189-1051930102.png)
- 1.状态间的转换
RUNNING -> SHUTDOWN //调用ShutDown方法后
(RUNNING or SHUTDOWN) -> STOP//调用ShuntDownNow方法
SHUTDOWN -> TIDYING //没有线程任务并且队列为空的时候
STOP -> TIDYING //线程池空了之后
TIDYING -> TERMINATED //当钩子方法(execute)结束后更改状态
- 2.状态对应的数据
/**
* 初始化用到的一些静态变量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//00001111 11111111 111111111 11111111
private static final int RUNNING = -1 << COUNT_BITS;//11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;//00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;//00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;//01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;//01100000 00000000 00000000 00000000
private static int runStateOf(int c) { return c & ~CAPACITY; }//获取高位三位,获取线程状态
private static int workerCountOf(int c) { return c & CAPACITY; }//获取后29位,获取线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }//进行或运算
- 3.状态含义
RUNNING: 接受新任务并运行队列中的任务
SHUTDOWN: 不接受新的任务,但是能运行队列中的任务
STOP: 不接受新的任务,不运行队列中的任务 ,并且中断正在运行中的任务
TIDYING: 所有任务都完成了,存活的线程为0, 当前线程池活动线程为0,将要调用terminated方法
TERMINATED: 终止状态。terminated方法调用完成以后的状态
2.存储线程任务结构
- 重点看到的参数是workers,线程池存储线程是通过Set集合存储所有线程节点。所以下面我们要重点了解Worker类
//存储任务的队列
private final BlockingQueue<Runnable> workQueue;
//用于shutdown或者shuntdownNow关闭线程池的锁
private final ReentrantLock mainLock = new ReentrantLock();
//存放线程池中所有的线程节点
private final HashSet<Worker> workers = new HashSet<Worker>();
//支持条件队列
private final Condition termination = mainLock.newCondition();
//线程池最大容量
private int largestPoolSize;
//统计执行的任务数,当线程回收时才进行统计
private long completedTaskCount;
//线程创建工厂
private volatile ThreadFactory threadFactory;
//是否允许核心线程过时
private volatile boolean allowCoreThreadTimeOut;
//默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Worker内部子类
Worker类继承AbstractQueuedSynchroner并实现Runnable接口
-
内部变量Thread保存,根据ThreadFactory创建的当前线程;
-
firstTask保存的是要执行的线程任务;
-
run方法提供了节点任务执行的方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
//构造方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;//保存任务
this.thread = getThreadFactory().newThread(this);//当前线程
}
/**
* 获取锁,将状态0改成1
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 执行当前节点的任务,后面会进行分析
*/
public void run() {
runWorker(this);
}
}
execute方法
主要是三个步骤:
-
1.少于核心线程数时,增加新的线程,如果增加新的线程节点,并执行线程任务(注意:初始,小于核心线程是没新增一个线程任务,才新增一个线程节点,并不是线程启动的时候一次性创建完毕)
-
runWorker方法两种情况,首先执行当前线程的任务task,执行完成后,循环遍历queue获取头节点任务;队列任务执行完后跳出队列;
-
判断线程池状态是否为STOP状态,跳出循环;
-
如果不为STOP状态,则执行任务,记录当前线程节点执行任务数量。执行完毕,遍历queue反复
-
跳出循环后,线程池workers线程集合,移除当前线程节点,统计线程池执行的总任务数,判断现有线程数量是否小于核心线程数,再增加新的空任务线程节点。(注意:所有线程节点执行完后,先移除,在判断是否需要生成新的节点)
-
-
2.如果大于核心线程数,再尝试增加到队列,如果增加到队列中成功。
-
2.1检查线程池是否正常运行,如果异常则移除队列中的线程任务,并执行拒绝策略
-
2.2检查线程池线程数量,线程池现有线程数量为0,则新建线程任务
-
-
3.判断线程数是否超过了最大的线程数,如果超过了,则直接执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获取线程池状态
if (workerCountOf(c) < corePoolSize) {//判断运行中的线程数量是否少于核心数量,创建新的线程数
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//增加到队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态,则移除当前线程任务
reject(command);//执行拒绝策略
else if (workerCountOf(recheck) == 0)//现有线程数量为0时,增加新的核心线程
addWorker(null, false);//增加一个空闲线程
}
else if (!addWorker(command, false))//判断增加的线程数是否超过了最大的线程数量,如果超过了,则执行拒绝策略
reject(command);
}
/**
* 作用:判断现有线程worker超过了核心线程数(或最大线程数),超过了返回false,否则增加新的线程worker
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取当前状态
int rs = runStateOf(c);//获取状态高位3位
/**
* 1. 当前线程池状态为STOP,TIDYING,TERMINATED
* 2. 当前线程池状态为SHUTDOWN并且已经有了第一个任务
* 3. 当前线程池状态为SHUTDOWN并且任务队列为空
* 也就是说,在加入任务之前调用了SHUTDOWN方法,或者SHUTDOWNNOW方法后就不能再加入节点。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/**
* 判断现有线程数是否超过最大线程数,如果超过最大线程数返回false
*/
for (;;) {
int wc = workerCountOf(c);//获取当前线程数
//现有线程数超过最大的数量返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//线程个数增加1
break retry;
c = ctl.get(); // 重新获取执行数量
// 查看线程池状态是否变更,如果改变则重试
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
/**
* 1.获取独占锁;
* 2.给线程池增加新的线程任务;
*/
try {
w = new Worker(firstTask);//新键任务节点
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 增加线程任务,创建新的线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//这里会调用Work的run方法,下面我会具体分析
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类的run方法(后续抽时间补充执行流程图)
主要作用是运行线程任务,具体步骤如下:
-
线程节点有任务,或队列中有任务(获取头节点任务)
-
返回空的几种情况:
- 1.现有线程数超过最大线程数时;
- 2.线程池时暂停状态
- 3.线程池时关闭状态并且队列为空
- 4.线程等待任务超时时
-
-
如果当前线程节点有线程任务,或者循环返回的队列头节点有任务继续执行任务,如果没有可执行任务,则调用processWorkerExit方法,统计线程执行的任务,并清除线程池中空闲的线程;
-
获取线程锁,并判断是否STOP状态,如果是STOP状态,说明调用了ShutdownNow方法,中断当前线程;调用processWorkerExit方法,进行任务统计及线程回收。
-
执行线程中task的线程任务。并记录当前线程任务执行数量增加1,释放锁。
-
最后清除线程中空闲的线程,并汇总线程任务执行总数
/**
* 作用:运行节点的任务;
* 1.获取到任务后,释放锁,成为可中断的状态;
* 2.获取到任务,或者获取队列中第一个任务不为空时;
* 3.判断线程池状态,如果正常则执行运行节点任务;
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//
w.firstTask = null;
w.unlock(); //先释放锁,允许任务在执行之前被中断(如果调用ShutDown方法,会有锁竞争)
boolean completedAbruptly = true;
try {
/**
* 条件:当前节点的任务不为空,或者队列中的任务不为空;
* 如果任务为空,则获取头部节点的任务
*/
while (task != null || (task = getTask()) != null) {
w.lock();//获取锁,AQS中同步队列获取执行权
/**
* 如果线程池停止了,查看当前线程是否中断,
* 如果中断了,则清除当前线程中端状态。并中断当前线程中任务执行
* (这里时因为ShuntDownNow方法要立即停止所有线程任务,但是只用ShutDown的话,当前线程的任务还是会继续执行。)
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//这里是抽象方法,留出扩展的空间,如果抛出异常直接进行捕获
Throwable thrown = null;
try {
task.run();//执行线程的任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/**
* 如果线程任务在执行过程中出现错误,在这里进行处理。
* 这里提供的是抽象方法,继承类可以进行修改,用于处理异常情况,线程任务处理
* 这里出现异常也可能会造成线程中断
*/
afterExecute(task, thrown);
}
} finally {
task = null;//执行完成,进行释放
w.completedTasks++;//完成任务++
w.unlock();
}
}
completedAbruptly = false;
} finally {
/**
* 作用:1.统计任务执行数量;2.多余线程节点回收
*/
processWorkerExit(w, completedAbruptly);
}
}
/** 有以下几种情况会返回null;
* 1.现有线程数超过最大线程数时;
* 2.线程池时暂停状态
* 3.线程池时关闭状态并且队列为空
* 4.线程等待任务超时时
* 主要作用: 除了以上非null的情况之外,从队列的头部获取到线程任务;
*/
private Runnable getTask() {
boolean timedOut = false;
for (;;) {//自旋
int c = ctl.get();
int rs = runStateOf(c);
// 检查队列是不是为空,为空则返回null;
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程池现有线程数
int wc = workerCountOf(c);
// 判断线程池现有线程数是否大于核心线程数,并且核心线程数是可过时的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 满足以下两个条件
* 1.现有线程数大于最大线程数
* 2.或现有线程数大于1且超时时
* 减少线程数
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取头节点,如果头节点不为空,则返回线程任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* 1.记录完成的任务数,并清除完成任务的线程
* 2.判断是否要终止线程池;
* 3.节点回收
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //如果是中断状态
decrementWorkerCount();//线程数量减1
final ReentrantLock mainLock = this.mainLock;//获取线程池主锁
mainLock.lock();
try {
completedTaskCount += w.completedTasks;//记录完成的任务数
/**
* workers中移除线程执行节点
* 不管是否小于核心线程数,线程节点都要先移除
*/
workers.remove(w);
} finally {
mainLock.unlock();//释放线程池主锁
}
tryTerminate();//判断是否需要更改线程池状态
int c = ctl.get();
/**
* 满足条件再增加一个空的线程节点;
* 1. 如果当前时SHUNTDOWM状态。当任务完成正常执行完毕后
* 2. 判断核心线程是否需要回收,并且核心线程数量是否达标
* 3. 如果核心线程数量满足则返回,否则,增加一个空任务的线程节点
* 这里也是节点循环执行任务的根本所在
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);//增加一个节点
}
}
/**
* 作用:清除空闲的线程节点,判断是否将线程池的状态变更成TERMINATED状态
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 运行状态,或者中断状态但是有任务要执行
* 还有任务执行时返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 清除空闲的线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//终止操作由子类进行操作
} finally {
ctl.set(ctlOf(TERMINATED, 0));//将终止状态变TERMINATED状态
termination.signalAll();//唤醒条件队列
}
return;
}
} finally {
mainLock.unlock();
}
}
}
shutDown方法
- 核心点就是,逐个中断节点时,如果节点还没中断,则先让该节点获取独占锁,然后再中断
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检测是否有关闭线程池的权限
advanceRunState(SHUTDOWN);//变更线程池状态
/**
* 逐个中断线程节点
*(注意这里判断,线程节点如果没有中断,先让节点的独占锁,保证线程任务执行完之后才进行中断操作)
*/
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor(钩子方法,等待实现类实现)
} finally {
mainLock.unlock();
}
tryTerminate();//尝试变更线程池状态为TERMIATED,这个方法上面已经分析过了
}
//中断线程节点
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//如果线程没有中断,先获取锁之后才中断,也即是等任务执行完毕后才中断
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutDownNow 方法
-
中断所有线程节点,不考虑节点有任务与否;
-
返回队列中剩余的任务数;
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查权限
advanceRunState(STOP);//更改线程任务为STOP状态
interruptWorkers();//直接中断线程
tasks = drainQueue();//获取队列中剩余的任务,并清空队列
} finally {
mainLock.unlock();
}
tryTerminate();//尝试变更线程池状态
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();//这里进行的操作是,只要没中断就进行中断,不会有获取锁的操作
} finally {
mainLock.unlock();
}
}
几种拒绝策略
- AbortPolicy 总是抛出异常(这种是默认的)
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy 只要线程池没关闭就执行线程任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy 静默处理,也就是啥都不干
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy 把队列中的第一个任务给抛掉,然后执行当前的这个任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor方法扩展
-
上面源码分析中,我们也看到了ThreadPoolExecutor提供的几个钩子方法,供子类去扩展。具体如下面这个类:
-
一般我们对线程firstTask任务执行失败后的处理也可以用提供的钩子方法进行相关处理。
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {//在线程任务执行之前进行的操作;
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();//加入到条件队列
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {//
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();//全部唤醒
} finally {
pauseLock.unlock();
}
}
}}
总结归纳
1.ThreadPoolExecutor中线程如何管理的?超过核心线程的线程是如何被调用的?
线程管理的主要是通过两个参数管理的,
一个是ctl的原子变量,标识的是现存线程的个数;
另一个参数是workers,HashSet进行存储的,具体的线程worker实例;
从前面代码分析可以看出:
1.未超过核心线程数,会创建一个新的worker,存储到workers集合中,并且执行worker中firstTask的任务;
2.超过线程池的任务被放入到queue;
3.worker执行完当前任务和队列中任务后,当队列中没有可执行任务,直接会把当前的worker移除,并统计当前worker执行的任务;
4.如果workers中的线程节点数少于核心线程数,那么会调用增加addWork的方法,新增一个worker,firstTask为空去执行queue中的任务;
5.循环3,4步骤,直到线程池调用shutdown方法或shutdownNow方法;
从上面也可以看出,执行完任务的worker节点在队列没有任务的时候都会从workers数组中remove掉,然后新增空任务worker去处理queue中的新任务。