ThreadPoolExecutor点滴
线程池应该也是面试绕不开的一个点,平时大家也没少用,但其实也有一些小Tips还是值得记录一下。
Constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
-线程池中保留的线程数量——尽管空闲(如果allowCoreThreadTimeOut
被调用或者设置,在达到keepAliveTime
后会销毁)maximumPoolSize
-线程池中允许的最大线程数量keepAliveTime
-线程数量超过核心线程数,超出的空闲线程最大存活时间unit
-存活时间单位workQueue
-任务队列,只接受通过execute
方法提交的任务,最好自定义队列为有界队列,否则默认是Integer.MAX_VALUE
(2147483647),内存会被挤压的任务压爆threadFactory
-线程创建工厂(一个接口),默认是Executors.defaultThreadFactory()
,可以设置线程的名字(比如设置更有业务含义的名字,方便定位问题),是否是守护线程,设置优先级等handler
-拒绝策略handler,默认的有4个:1. 直接拒绝抛出异常;2. 交给提交线程来执行;3. 丢弃最旧提交但是没有执行的任务;4. 直接丢弃
一个任务提交到线程池的执行流程如下:
- 当前活动线程小于核心线程数,直接新起线程运行
- 当前活动线程大于核心线程数,入阻塞任务队列
- 当前活动线程大于核心线程数,阻塞任务队列已满,检查活动线程是否达到最大线程数,没有则新开一个线程,否则执行拒绝策略
线程池的状态
注意是线程池的状态而非池中处理任务线程的状态,代码中用一个int来存放线程池的状态(高三位)和活动线程数(低29位)。
RUNNING
(-1<<29):可以接受新任务和处理队列中的任务SHUTDOWN
(0<<29):不接受新的任务,但会继续处理队列中的任务STOP
(1<<29): 不接受新的任务,也不继续处理队列中的任务,且会打断正在处理中的任务TIDYING
(2<<29):所有的任务都terminated,worker数是0,线程转移到该状态会运行terminated()
钩子方法TERMINATED
(3<<29):terminated()
执行完毕
状态之间的转移:
RUNNING
->SHUTDOWN
,shutdown()
被显示调用- (
RUNNING
ORSHUTDOWN
) ->STOP
,shutdownNow()
被显示调用 SHUTDOWN
->TIDYING
, 队列和池都空STOP
->TIDYING
, 池空TIDYING
->TERMINATED
,terminated()
执行完毕
// 主线程池控制状态(control state)ctl,把workerCount和runState聚到到一个变量中,runStateOf/workerCountOf得到当前线程的状态和运行数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int CAPACITY = (1 << 29) - 1;
// c就是ctl
// 得到高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 得到低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
几个容易被忽略的API
在面试中被问到过一次这样的问题:想让线程池在初始化时就干活,而不是等到第一次提交任务时才创建线程,该怎么做?
这个问题在实际中应该是存在的,毕竟线程的创建是有开销的,既然创建了线程池肯定是为了用,也就不存在需要延迟创建之类的需求,不如在系统启动时就将线程池中的线程创建好,这样来了任务就可以直接交由线程处理。
翻看线程池的doc,可以看到有这样两个方法
/**
* Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed. This method will return false if all core threads have already been started.
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
Starts all core threads, causing them to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
从源代码可以发现两个方法默认的策略是只有第一个任务到达时才会创建线程,可以覆盖这两个方法,然后在创建线程池时调用,就可以达到目的。
@Override
public boolean prestartCoreThread() {
execute(() -> System.out.println("start a core thread"));
return true;
}
@Override
public int prestartAllCoreThreads() {
int n = 0;
while (++n < coreSize) {
execute(() -> System.out.println("start a core thread"));
}
return n;
}
此外还有两个hook方法:beforeExecute
和afterExecute
,也可以通过覆盖这两个方法,在一个task执行的前后做一些事情。
/**
t - the thread that will run task r
r - the task that will be executed
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("---before log---");
super.beforeExecute(t, r);
}
/**
r - the runnable that has completed
t - the exception that caused termination, or null if execution completed normally
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println("---after log---");
}
END
当然线程池还有其他很多知识点,鉴于大多数的文章都会涉及到我也不在赘述了,这两个扩展点平时中用到的比较少,所以权当笔记和扩展一点自己的知识点,无论是面试还是实践,感觉按照场景来都有些许用处。