线程池创建
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(
int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
)
线程池参数:
-
corePoolSize:线程池核心线1个数;
-
workQueue:用于保存等待执行的任务的阻塞队列;比如基于数组的有界
ArrayBlockingQueue
,基于链表的无界LinkedBlockingQueue
,最多只有一个元素的同步队列SynchronousQueue
,优先级队列PriorityBlockingQueue
等。 -
maximunPoolSize:线程池最大线程数量。
-
ThreadFactory:创建线程的工厂。
-
RejectedExecutionHandler:饱和策略,当队列满了并且线程个数达到
maximunPoolSize
后采取的策略,比如AbortPolicy
(抛出异常),CallerRunsPolicy
(使用调用者所在线程来运行任务),DiscardOldestPolicy
(调用 poll 丢弃一个任务,执行当前任务),DiscardPolicy
(默默丢弃,不抛出异常)。 -
keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。
-
TimeUnit:存活时间的时间单位。
线程池的状态
线程池的状态使用原子Integer变量来存储,高3位记录线程池的状态,剩下的29位记录线程的数量
//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态:
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取高三位 运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取低29位 线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态含义:
- RUNNING:接受新任务并且处理阻塞队列里的任务;
- SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;
- STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务;
- TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用
terminated
方法; - TERMINATED:终止状态,terminated方法调用完成以后的状态。
线程池状态转换:
1.RUNNING -> SHUTDOWN:显式调用 shutdown()
方法,或者隐式调用了 finalize()
,它里面调用了 shutdown()
方法。
2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow()
方法时候。
3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
4.STOP -> TIDYING:当线程池为空的时候。
5.TIDYING -> TERMINATED:当 terminated() hook
方法执行完成时候。
线程池执行流程
任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,如果小于则创建线程来执行提交的任务,否则将任务放入workQueue队列,如果workQueue满了,则判断当前线程数量是否小于maximumPoolSize,如果小于则创建线程执行任务,否则就会调用handler,以表示线程池拒绝接收任务。
队列里的任务什么时候执行?
worker中 runWorker() 一个任务完成后,会取下一个任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask() 从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行的是run()方法 而不是start()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//是否是阻塞的队列 是使用take()获取任务 不是使用poll
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池是如何复用的
在工作线程里直接执行run方法,而不是通过start执行创建线程再执行
try {
//执行的是run()方法 而不是start()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}