线程池
之前一直有这个疑问:我们平时使用线程都是各种new Thread(),然后直接在run()方法里面执行我们要做的各种操作,使用完后需要做什么管理吗?线程池为什么能维持住核心线程不释放,一直接收任务进行处理呢?
线程
线程无他,主要有两个方法,我们先看看start()方法介绍:
/**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
* <p>
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
* <p>
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* @exception IllegalThreadStateException if the thread was already
* started.
* @see #run()
* @see #stop()
*/
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
started = false;
try {
nativeCreate(this, stackSize, daemon);
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
从这个方法解释上看,start()这个方法,最终会交给VM 去执行run()方法,所以一般情况下,我们在随便一个线程上执行start(),里面的run()操作都会交给VM 去执行。
而且还说明,重复启用线程是不合法的,当一个线程完成的时候,may not be restarted once。
那么这种情况下,线程池是怎么做的?他为什么就能够重复执行各种任务呢?
--------------------------------------------------------------------------------
带着各种疑问,我们去看看线程池自己是怎么实现的。
线程池
线程池常用的创建方法有那么几种:
1. newFixedThreadPool()
2. newSingleThreadExecutor()
3. newCachedThreadPool()
4. newScheduledThreadPool()
这4个方法创建的线程池实例具体就不一一介绍,无非是创建线程的多少,以及回收等问题,因为其实这4个方法最后都会调用统一的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
具体来说只是这几个值的不同决定了4个线程池的作用:
1. corePoolSize 代表核心线程池的个数,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程
2. maximumPoolSize 代表最大的线程池个数,当线程池需要执行的任务大于核心线程池的时候,会创建更多的线程,但是最大不能超过这个数
3. keepAliveTime 代表空余的线程存活的时间,当多余的线程完成任务的时候,需要多长时间进行回收,时间单位是unit 去控制
4. workQueue 非常重要,这个工作队列会存放所有待执行的Runnable对象
@param workQueue the queue to use for holding tasks before they areexecuted. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.
我们平时在使用线程池的时候,都是直接 实例.execute(Runnable),一起跟进去,看看这个方法具体做了什么
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//结合上文的注释,我们得知,第一次,先判断当前的核心线程数,
//如果小于初始化的值,马上创建;然后第二个if,将这个任务插入到工作线程,双重判断任务,
//假定如果前面不能直接加入到线程池Worker集合里,则加入到workQueue队列等待执行。
//里面的if else判断语句则是检查当前线程池的状态。如果线程池本身的状态是要关闭并清理了,
//我们则不能提交线程进去了。这里我们就要reject他们。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
所以其实主要起作用的还是addWorker()方法,我们继续跟踪进去:
private boolean addWorker(Runnable firstTask, boolean core) {
···多余代码
try {
w = new Worker(firstTask); 1.重点
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); 2. 重点
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
我们看重点部分,其实最重要的是firstTask这个Runnable,我们一直跟踪这个对象就可以了,这个对象会new Worker(),那么这个wroker()就是一个包装类,里面带着我们实际需要执行的任务,后面进行一系列的判断就会执行t.start(); 这个t 就是包装类worker类里面的Thread,所以整个逻辑又转化进去Worker内部。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
...省略代码
}
这个Worker包装类,重要的属性两个,thread 就是刚才上面那个方法执行的start()对象,这个thread又是把这个worker对象本身作为一个Runnable对象构建出来的,那么当我们调用thread.start()方法时候,实际调用的就是Worker类的run()方法。现在又要追踪进去,看这个runWorker(this),做的是什么鬼东西
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这个方法还是比较好懂的:
1. 一个大循环,判断条件是task != null || (task = getTask()) != null,task自然就是我们要执行的任务了,当task空而且getTask()取不到任务的时候,这个while()就会结束,循环体里面进行的就是task.run();
2.这里我们其实可以打个心眼,那基本八九不离十了,肯定是这个循环一直没有退出,所以才能维持着这一个线程不断运行,当有外部任务进来的时候,循环体就能getTask()并且执行。
3.下面最后放getTask()里面的代码,验证猜想
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
真相大白了,里面进行的也是一个死循环,主要看 Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
工作队列workQueue会一直去拿任务,属于核心线程的会一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收
从以上代码可以看出,getTask()的作用是
如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。
如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。
小结
通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:
当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。
当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。
注意:
本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。
另外还有一个重要的接口 BlockingQueue 值得去了解,它定义了一些入队出队同步操作的方法,还可以阻塞,作用很大。
总结
一句话可以概述了,线程池就是用一堆包装住Thread的Wroker类的集合,在里面有条件的进行着死循环,从而可以不断接受任务来进行。