ThreadPoolExecutor类:
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,我们先从了解这个类开始,来学习线程池。
在该类中一共提供了四个构造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {} public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {} public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {} public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler){}
其中前三个构造器都是调用第四个构造器进行初始化工作,由于本人的强迫症,源码全部贴出会对不齐,所以只贴出函数没贴出方法体。
参数列表中参数的含义:
corePoolSize:核心池大小。默认情况下,线程池创建后并没有任何线程。任务到来才会创建线程去执行任务。通过prestartAllCoreThreads()或者prestartCoreThread()方法可以在任务未到来之前预创建corePoolSize个或一个线程。线程池中线程数目到达corePoolSize后,会把到达的任务放到缓存队列中。
maximumPoolSize:线程池最大线程数。
keepAliveTime:设定线程没有任务执行时的最多保持多久时间会终止。默认下,该参数只有当线程池中线程数大于corePoolSize时生效,当线程池中线程数大于corePoolSize,当一个线程的空闲时间达到keepAliveTime就会终止。
unit:keepAliveTime的时间单位,在TimeUnit类中有七种静态属性。TimeUnit(DAYS、HOURS、MINUTES、SECONDS/MILLISECONDS、MICROSECONDS、NANOSECONDS)
workQueue:用来存储等待执行的任务的阻塞队列。有三种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
threadFactory:线程工厂,主要用以创建线程。
handler:当拒绝处理任务时的策略。详情在后面介绍
我们从ThreadPoolExecutor类一层层往上看,可以如图所示关系:
Executor是顶层接口,它只声明了一个execute(Runnable)方法,用来执行传进去的任务。然后下面几个接口都是声明一些其他的方法。我们主要看在ThreadPoolExecutor类中几个较为重要的方法。
execute():通过该方法向线程池提交一个任务,交由线程池执行。
submit():在ExecutorService中声明,具体实现AbstractExecutorService中,也是用来向线程池提交任务,但与execute()不同的是,它可以返回任务执行的结果。实际上该方法还是调用的execute(),只不过它利用Future来获取任务执行结果。、
线程池的关闭:
shutdown()和shutdownNow()是用来关闭线程池的。
区别在于shutdown()不会立即终止线程池,当缓存队列中任务运行完之后才会终止,而且不会再接收新任务。
shutdownNow()立即终止线程池,并尝试打断正在运行的线程,清空缓存队列 中的任务,返回尚未执行的任务。
线程池的实现原理:
我们从以下几个方面来讲解线程池的具体实现:
1、线程池的状态。2、任务的执行。3、线程池中的线程初始化。4、任务缓存队列及排队策略。5、任务拒绝策略。6、线程池容量的动态调整。
线程池的状态:
线程池共有五种状态。Running、ShutDown、Stop、Tidying、Terminated。关系如图:
Running:线程池能接收新任务,并对已他添加的任务进行处理。线程池一旦被创建就处于该状态,并且池中任务数为0。
ShutDown:线程池不接受新任务,但能处理已添加的任务。
Stop:线程池不接收新任务也不处理已添加的任务。并中断正在处理的任务。
Tidying:当所有任务已终止。线程池任务数量为0时,变为此状态。线程池在该状态下回执行钩子函数terminated()。如果用户想在线程池变为该状态时进行相应处理,可通过重载terminated()函数来实现。
Terminated:线程池彻底终止。Tidying下,线程池执行完钩子函数terminated()后,变为该状态。
只有Runing状态下才会接收新任务、只有Running和Shudown状态才会执行缓存队列中的任务。其他状态下都不会接收新任务,不会执行队列里的任务。
任务的执行:
执行的核心方法是execute()方法,前面已经说过submit的底层也是调用此方法,因此了解execute()方法的实现即可。
我们先了解一些成员变量的意义:
ctl:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 代表了线程池的控制状态。使用AtomicInteger的CAS机制来实现对运行时状态及工作线程计数的并发一致性操作。它主要包装两个概念:
1、workerCount:线程池中当前活动的线程数量,占ctl的低29位。最大为2^29-1
2、runState:线程池运行状态,占ctl的高3位。包含五种状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 接收新的任务,并且执行缓存任务队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不再接收新的任务,但是会执行缓存任务队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新的任务,也不执行缓存队列中的任务,并且中断正在运行的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务已经终止,workCount为0,这个状态为暂时状态,之后将调用terminated() hook method
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法调用完成
private static final int TERMINATED = 3 << COUNT_BITS;
其他成员:
// 缓存任务阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 线程池主锁,用于访问worker线程集,还有其他关于线程池信息的记录信息(比如线程池大小,runState)
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合,访问时需获取mainLock
private final HashSet<Worker> workers = new HashSet<Worker>();
// mainLock上的终止条件量,用于支持awaitTermination
private final Condition termination = mainLock.newCondition();
// 记录曾经创建的最大线程数,访问需获取mainLock
private int largestPoolSize;
// 对已经完成任务进行计数,只有在工作线程终止时才会更新,访问需要获取mainLock
private long completedTaskCount;
/**
* 以下所有变量都为volatile类型的,以便能使所有操作都基于最新值
* (因为这些值都可以通过对应的set方法,在运行时动态设置),
* 但是不需要获取锁,因为所有内部一致性不依赖这些参数的同步访问来保证
*/
// 用于创建新线程的线程工厂
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
// 设置默认任务拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// 对于调用线程池的shutdown(),shutdownNow()方法权限认证
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
现在开始看源码:
public oid execute(Runnable command) {
if (command == null) ------(1) throw new NullPointerException(); int c = ctl.get(); ------(2) if (workerCountOf(c) < corePoolSize) { ------(3) if (addWorker(command, true))------(4) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { ------(5) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)){------(6) reject(command); }else if (workerCountOf(recheck) == 0) ------(7) addWorker(null, false); } else if (!addWorker(command, false)) ------(8) reject(command); }
(1)判断提交任务是否为null,为null抛出空指针异常。
(2)获取当前线程池的ctl值。
(3)如果当前线程数小于核心池大小,任务不会进入队列。会创建新的工作线程直接执行任务。
(4)addWorker操作返回false的话,即添加新的工作线程失败。则获取当前线程池状态。(失败原因:在线程池数量小于核心池大小时,创建新的工作线程失败,是因为线程池状态发生改变,已经是非Running状态或shutdown状态且任务队列为空。)
(5)线程池属于Runing状态 ,说明线程池中线程数已经大于核心池大小。这是将任务放入队列,等待执行。两种情况下执行该步:
1、池中线程数小于核心池大小,并创建新工作线程失败。
2、池中线程数大于等于核心池大小。
(6)再次检查线程池状态。如果状态变了。非Running状态下不接收新任务。需将任务移除,成功从队列中删除任务后,则执行reject方法处理任务。
(7)如果线程池状态未改变,且池中无线程。此时进入addWorker方法有两种情况:
1、线程池处于Running状态,线程池中无线程。因有新任务进入队列所以要创建工作线程。此时新任务已经在队列中,所以第一个参数要执行的任务是null,只是创建一个新工作线程并启动,让它自己去队列中取任务。
2、线程池处于非Running状态,但是任务移除失败。队列中仍旧有任务。但线程池中线程数为0,则创建新工作线程,处理队列任务。
(8)两种情况会执行第8步:
1、非Running状态拒绝新任务并无法成功创建新线程,拒绝任务。
2、Running状态下,线程池中线程数大于核心池大小。任务需要放入队列。如果任务入队失败,说明队列满了。则创建新的线程。创建成功继续执行任务。创建失败则说明池中线程数已超过最大限制,则拒绝任务。
我们看看添加work工作线程的方法:addWorker(Runnable firstTask,boolean core);
private boolean addWorker(Runnable firstTask, boolean core) { retry:
//增加线程数计数,只有增加计数成功,才会增加线程 for (;;) { int c = ctl.get(); int rs = runStateOf(c); //此处判断,如果是Stop、Tidying、Terminated三个状态下都会返回false。这个三个状态下不会接收新任务,也不执行队列任务。中断当前执行任务
//如果是Shutdown状态,firstTask(不接收新任务)不为空。或队列里没有任务。返回false
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //Runing状态。或Shutdown状态且firstTask为null,但队列中有任务执行下面 for (;;) { int wc = workerCountOf(c);
//如果线程数大于最大可创建线程数,返回false
//判断当前是根据核心池大小还是最大线程池大小来创建线程。未到达核心池大小,按核心池大小限制线程池大小。达到后并且队列满了。才会按最大线程大小限制线程池大小
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //将工作线程数通过CAS操作加1,成功的话跳出循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //创建worker线程对象,并启动 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //创建新的worker对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取线程池的重入锁 try { int rs = runStateOf(ctl.get()); //Running或Shutdown状态下,没有新任务,只处理队列中剩余任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//如果线程是活动状态,直接抛出异常。因为线程刚创建,还未执行start()方法,一定不会是活动状态 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size();
//将线程池中创建过的线程最大数量,设置给largestPoolSize,可以通过getLargestPoolSize()方法获取,
//注意这个方法只能在 ThreadPoolExecutor中调用,Executer,ExecuterService,AbstractExecutorService中都是没有这个方法的
if (s > largestPoolSize)
largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); }
//启动新添加的线程,该线程首先执行firstTask,然后不断从队列中取任务执行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
该方法的执行过程:
1、增加线程时,先判断当前线程池的状态允不允许创建新的线程,如果允许再判断线程池有没有达到 限制,如果条件都满足,才继续执行;
2、先增加线程数计数ctl,增加计数成功后,才会去创建线程;
3、建线程是通过work对象来创建的,创建成功后,将work对象放入到works线程池中(就是一个hashSet);
4、添加完成后,更新largestPoolSize值(线程池中创建过的线程最大数量),最后启动线程,如果参数firstTask不为null,则执行第一个要执行的任务,然后循环去任务队列中取任务来执行;
成功添加worker工作线程的状态有两种:
1、线程池处于Running状态。
2、线程池处于Shutdown状态,且创建线程的时候没有传入新任务。且队列不为空。
线程池中的线程初始化:
创建线程池后,默认池中没有线程,需要提交任务之后才会创建线程。
通过prestartCoreThread()和prestartAllCoreThreads()两个方法可以在线程池创建之后立即创建线程。
public boolean prestartCoreThread() { //初始化一个核心线程 return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); }
public int prestartAllCoreThreads() {//初始化所有核心线程 int n = 0; while (addWorker(null, true)) ++n; return n; }
两个方法传的第一个参数都是null,表示创建一个新工作线程并启动,等待任务队列中有任务时,让它自己去队列中取任务。
任务缓存队列及排队策略:
workQueue,任务缓存队列,用来存放等待执行的任务。其类型为BlockingQueue<Runnable>,通常以下三种类型:
1、ArrayBlockingQueue:基于数组的先进先出队列,创建时必须指定大小。
2、LinkedBlockingQueue:基于链表的先进先出队列,如果创建时未指定大小,默认为Integer.MAX_VALUE。
3、synchronousQueue:不会保存提交的任务,会直接创建一个新线程来执行新来的任务。
任务拒绝策略:
当线程池的任务缓存队列已满并且线程池中的线程数达到maximumPoolSize。如果还有任务到来就会采取任务拒绝策略。一般有以下四种策略:
1、 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
2、ThreadPoolExecutor.DiscardPolicy:丢弃任务但不抛出异常
3、ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4、ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
线程池容量的动态调整:
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
1、setCorePoolSize:设置核心池大小
2、setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
线程池使用实例: