1.线程池的状态:
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING=0;
static final int SHUTDOWN=1;
static final int STOP=2;
static final TERMINATED=3;
runState表示当前的线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutDown()方法,则线程处于STOP状态,此时线程池不能接受新的任务,它会等待所有任务执行完毕。
如果调用了shutDown()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会尝试终止正在执行的任务。
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.任务的执行
ThreadPoolExecutor类中其他一些重要的成员变量:
- private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
- private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小
- //、runState等)的改变都要使用这个锁
- private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
- private volatile long keepAliveTime; //线程存货时间
- private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
- private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
- private volatile int maximumPoolSize; //线程池最大能容忍的线程数
- private volatile int poolSize; //线程池中当前的线程数
- private volatile RejectedExecutionHandler handler; //任务拒绝策略
- private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
- private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
- private long completedTaskCount; //用来记录已经执行完毕的任务个数
corePoolSize就是线程池的大小,maximumPoolSize是任务量突然过大的一种补救措施
largestPoolSize只是一个用来记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
- public void execute(Runnable command){
- if(command==null){
- throw new NullPointerException();
- }
- if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command)){
- if(runState!=RUNNING||poolSize==0){
- ensureQueuedTaskHandled(command);
- }else if(!addIfUnderMaximumPoolSize(command)){
- reject(command);
- }
- }
- }
if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command))
由于是或条件运算符,所以先计算前半部分的值,如果当前线程数不小于核心池的大小,那么就会直接进入下面的IF语句块了。
如果线程池中当前线程数小于核心池大小,则执行后半部分,也就是执行addIfUnderCorePoolSize(command),如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。
如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if(runState==RUNNING&&workQueue.offer(command))
如果当前线程池处于RUNNING状态,则将任务放入缓存队列,如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。
if(runState==RUNNING&&workQueue.offer(command))
这句的执行,如果说当前线程池处于RUNNING状态且将任务放入缓存队列成功,则继续进行判断:
if(runState!=RUNNING||poolSize==0)
这句判断是为了防止此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施,如果这样就执行:
ensureQueuedTaskHandled(command)
进行应急处理,从名字可以看出是保证添加到任务队列中的任务得到处理。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
- public void execute(Runnable command){
- if(command==null){
- throw new NullPointerException();
- }
- if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command)){
- if(runState==RUNNING||workQueue.offer(command)){
- if(runState!=RUNNING||poolSize==0){
- ensureQueuedTaskHandled(command);
- }
- }else if(!addIfUnderMaximumPoolSize(command)){
- reject(command);
- }
- }
- }
if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command))
由于是或条件运算符,所以先计算前半部分的值,如果当前线程数不小于核心池的大小,那么就会直接进入下面的IF语句块了。
如果线程池中当前线程数小于核心池大小,则执行后半部分,也就是执行addIfUnderCorePoolSize(command),如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。
如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if(runState==RUNNING&&workQueue.offer(command))
如果当前线程池处于RUNNING状态,则将任务放入缓存队列,如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。
if(runState==RUNNING&&workQueue.offer(command))
这句的执行,如果说当前线程池处于RUNNING状态且将任务放入缓存队列成功,则继续进行判断:
if(runState!=RUNNING||poolSize==0)
这句判断是为了防止此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施,如果这样就执行:
ensureQueuedTaskHandled(command)
进行应急处理,从名字可以看出是保证添加到任务队列中的任务得到处理。
- addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
- private boolean addIfUnderCorePoolSize(Runnable firstTask){
- Thread t=null;
- final ReentrantLock mainLock=this.mainLock;
- mainLock.lock();
- try{
- if(poolSize<corePoolSize&&runState==RUNNING){
- t=addThread(firstTask);//创建线程去执行firstTask任务
- }finally{
- mainLock.unLock();
- }
- }
- if(t==null){
- return false;
- }
- t.start();
- return true;
- }
它的意图就是当低于核心池大小时执行的方法。具体实现:
首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池的大小,前面execute()判断的时候没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolzSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以在这个地方继续进行判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用其他线程中调用了shutDown或者shutDownNow方法。然后就是执行
t=addThread(firstTask);
这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败,否则调用t.start();
- private Thread addThread(Runnable firstTask){
- Worker w=new Worker(firstTask);
- Thread t=threadFactory.newThread(w);//创建一个线程,执行任务
- if(t!=null){
- w.thread=t;//将创建的线程引用赋值为w的成员变量
- workers,add(w);
- int nt=++poolSize;//当前线程数加1
- if(nt>largestPoolSize){
- largestPoolSize=nt;
- }
- }
- return t;
- }
在addThread方法中,首先用提交的任务创建了一个Worker对象,然后滴啊用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着 通过workers.add(w)将Worker对象添加到工作集当中。
- Worker类的实现:
- private final class Worker implements Runnable{
- private final ReentrantLock runLock=new ReentrantLock();
- private Runnable firstTask;
- volatile long completedTask;
- Thread thread;
- Worker(Runnable firstTask){
- this.firstTask=firstTask;
- }
- boolean isActive(){
- return runLock.isLocked();
- }
- void interruptIfIdle(){
- final ReentrantLock runLock=this.runLock;
- if(runLock.tryLock()){
- try{
- if(thread!=Thread.currentThread){
- thread.interrupt();
- }finally{
- runLock.unLock();
- }
- }
- }
- }
- void interruptNow(){
- thread.interrupt();
- }
- private void runTask(Runnable task) {
- final ReentrantLock runLock = this.runLock;
- runLock.lock();
- try {
- if (runState < STOP &&
- Thread.interrupted() &&
- runState >= STOP)
- boolean ran = false;
- beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
- //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
- try {
- task.run();
- ran = true;
- afterExecute(task, null);
- ++completedTasks;
- } catch (RuntimeException ex) {
- if (!ran)
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- runLock.unlock();
- }
- }
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this); //当任务队列中没有任务时,进行清理工作
- }
- }
- }
它实际上实现了Runnable接口,因此上面的Thread t=threadFactory.newThread(w);效果跟下面这句的效果基本一样:Thread t=new Thread(w);
相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。
既然worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:
- public void run(){
- try{
- Runnable task=firstTask;
- firstTask=null;
- while(task!=null ||(task =getTask)!=null){
- runTask(task);
- task=null;
- }
- }finally{
- workerDone(this);
- }
- }
从run方法可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里去取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是worker类中的方法,下面是getTask方法的实现:
- Runnable getTask(){
- for(;;){
- try{
- int state=runState;
- if(state>SHUTDOWN){
- return null;
- }
- Runnable r;
- if(state==SHUTDOWN){
- r=workQueue.poll();
- }else if(poolSize>corePoolSize||allowCoreThreadTimeOut){
- r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);
- }else{
- r=workQueue.take();
- }
- if(r!=null){
- return r;
- }
- if(workerCanExit()){
- if(runState>=SHUTDOWN){
- interruptIfIdleWorkers();
- }
- return null;
- }
- }catch(InterruptedException){
- }
- }
- }
在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。
如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列中取任务。
如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来去任务,这个方法会等待一定的时间,如果取不到任务就返回null。
然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出。
- private boolean workerCanExit(){
- final ReentrantLock mainLock=this.mainLock;
- mainLock.lock();
- boolean canExit;
- try{ canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));
- }finally{
- mainLock.unLock();
- }
- return canExit;
- }
也就是说如果线程池处于STOP状态,或者任务队列已为空或者允许为核心池吸纳成设置空闲存活时间大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker。
- void interruptIdleWorkers(){
- final ReentrantLock mainLock=this.mainLock;
- mainLock.lock();
- try{
- for(Worker w:workers){
- w.interruptIfIdle();//实际上调用的是worker的interruptIfIdle()方法
- }
- }finally{
- mainLock.unLock();
- }
- }
- 从实现上可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle方法中:
- void interruptIfIdle(){
- final ReentrantLock runLock=this.runLock;
- if(thread!=Thread.currentThread()){
- thread.interrupt();
- }finally{
- runLock.unLock();
- }
- }
这里有一个非常巧妙的方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面去任务来执行。
addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
- private boolean addIfUnderMaximumPoolSize(Runnable firstTask){
- Thread t=null;
- final ReentrantLock mainLock=this.mainLock;
- mainLock.lock();
- try{
- if(poolSize<maximumPoolSize&&runState==RUNNING){
- t=addThread(firstTask);
- }
- }finally{
- mainLock.unLock();
- }
- if(t==null){
- return false;
- }
- t.start();
- return true;
- }
总结:
任务提交给线程池之后的处理策略
1.如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行任务
2.如果当前的线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行,若添加失败(一般来说是任务缓存队列已满),则会尝试尝试创建新的线程去执行任务。
3.如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
4.如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize,如果允许核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。