之前使用线程池都是通过Executors.new...这种方式创建,因为Doug Lea已经给我们做了相对通用的设置,这么做的话简单又安全。但是有时候根据不同的场景可能需要进行一些自定义的操作。
比如,我需要一个初始情况下,使用10条核心线程运行任务,但是考虑到服务器的资源有限,我们希望限制在最多只能使用20条线程。我大概是这么定义的:
ThreadPoolExecutor service = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
在运行的时候发现活跃线程数(ActiveCount)最大值永远等于核心线程池数(CorePoolSize),于是翻了下代码:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
发现在实例化LinkedBlockingQueue的时候,这个队列的默认值是Integer的最大值,那这可以认为是一个无界队列,基本上是无法被填满的,那这就等于一个线程池数为10的固定线程池了。如果队列不满那么,就永远走不到新建工作线程的逻辑里面去。这也就解释了,为什么执行ActiveCount的永远为corePoolSize。所以为了使队列有界,重新定义队列长度:
ThreadPoolExecutor service = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(50));
执行一段时间后发现抛出异常:
java.util.concurrent.RejectedExecutionException
问题的原因是什么呢?在对队列设置了长度之后,当corePoolSize数量的线程都在运行状态会调用内部的addWorker创建非核心线程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 当workQueue达到容量上限的时候,offer()方法会返回false,进而走到下面addWorker的这个逻辑分支。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker中有一条逻辑如下,当大于等于最大线程池数量的时候返回false:
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
如果返回false就会执行reject()方法,而reject()所在的拒绝策略是默认的AbortPolicy,所以会抛出异常:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
绝大多数的任务应该都不允许丢弃,所以我们还需要指定拒绝策略,比如JDK已经提供的实现CallerRunsPolicy。当然也可以根据具体的场景自定义拒绝策略,比如将任务阻塞插入工作队列中:
RejectedExecutionHandler handler = (r, executor1)-> {
try {
executor1.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
关于线程池的相关感悟就这些。