线程池的分类
首先我们需要了解,使用线程池的目的。如果我们有大量的较短的异步任务需要执行,那么我们需要频繁的去创建线程并关闭线程。那么这样做的代价是十分巨大的,因此,我们就采用了一种线程池的做法,实际上,我们常用了池类方式还有数据库连接池,这种一般是将一些比较珍贵的资源放在池中,然后,每次使用完毕,再将其放回池中,不释放。节约了新建的成本。
下图是线程池的简单类图
一般我们通过Executors
这个工厂类来创建线程池,那么,我们来看一下,几种线程池的真面目吧!
对于线程池中使用的任务队列的数据结构,之后会单独开博客分析,这里先有一个简单的认识就好
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定大小的线程池,线程池的基本大小(corePoolSize)为nThreads
,最大线程数(maximumPoolSize)也为nThreads
,采用了LinkedBlockingQueue
队列来存放任务。
LinkedBlockingQueue是基于链表结构的阻塞队列,FIFO,队列属于无界队列。
当线程池中的线程数量小于最大线程数量时,会直接新建线程,执行任务。当线程数量已经达到最大线程数量,并且,没有空闲线程,任务会进入队列中排队,当有空闲线程则会执行。
keepAliveTime线程活动时间,这个值表示,当线程池中的线程大于corePoolSize时,超出corePoolSize的空闲线程最大存活时间,当时间超过keepAliveTime,线程将会结束,当这个值为0表示,线程不会被回收。
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
无界的线程池,并且线程空闲达到一定时间,会被回收,这里设定的时间是60s。
采用的队列是SynchronousQueue
,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
因此,对于这种线程池,当有任务时,如果没有空闲线程,会直接增加线程,执行任务,由于我们将最大线程数设定为Integer.MAX_VALUE
,所以,线程池中的线程理论上没有上线,但是机器的性能是有限的,所以如果任务非常多,可能会发生资源耗尽的情况。
- newSingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
单线程的线程池,一般情况下,当我们有很多任务需要执行,但是他们并不需要同时执行,或者是有依赖关系的时候,例如B任务必须在A任务之后执行时,我们可以使用newSingleThreadPool
- newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
这个线程池一般用来执行定时任务,一般用下面的方法来设置定时任务执行的频率。
pool.scheduleWithFixedDelay(task, 2, 3, TimeUnit.SECONDS);
上面的方法表示,任务task在提交后2秒开始执行,执行完毕后每3秒执行一次。
- newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
这个线程池是在Java8中新增的线程池,可以看出,这个线程池实际上是ForkJoinPool
,这里就是采用了fork join算法。当一个任务可以分解为多个小任务的时候,我们可以使用这种方式,充分利用CPU的性能。
但是,我们需要注意的是,如果任务十分简单,那么这种拆分方式不仅不会提高效率,有时因为线程切换结果合并等问题可能还会更慢。
分析线程池的submit
方法
MyCallable callable = new MyCallable();
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future executorService.submit(callable);
String s = future.get();
System.out.println(s);
executorService.shutdown();
这里我们分析的实际上就是上面这段代码中的submit
方法。这里我们注意ExecutorService
是AbstractExecutorService
的子类,ExecutorService
中没有重写方法submit
,那么我们调用的实际是父类的方法。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
这里我们看到了在上一篇深入并发之(三) FutureTask与Future之写线程池的时候到底需不需要自己包装FutureTask - 菱灵心 - 博客园中介绍过的方法newTaskFor
,作用是将Callable
包装为FutureTask
,这里不再介绍。
我们可以看到这个方法是直接返回的,这里也对应了第一部分提到的,这种future
设计方法的核心,就是无需等待异步任务执行,我们可以让主线程先去做其他任务,稍后我们可以从返回值中获取我们想要的结果。
大致讲解线程池中一些基本内容
线程数与运行状态的保存
需要注意,这一部分的主要内容实际是为了方便下面来查看源码。
首先是ThreadPoolExecutor
类。
对于线程池,我们肯定需要知道线程池中现在有多少线程,同时我们也需要知道线程池的状态。那么,在ThreadPoolExecutor
中是如何保存这个数据的呢。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
线程池中采取了原子的Integer,并且将32位的整数分为两部分,分别保存runstate
与workercount
,这种保存数据的方式不是我们第一次见到了,在AQS中的共享锁中也是类似的方式,源码中经常使用的一种写法。workercount
被保存在高3位中,余下的低位用来保存runstate
。
运行状态的变化
这里我们通过一张图来了解线程运行状态的改变过程。
图中上面是线程池的状态,下面是代表状态的常量值。当线程池刚刚创建的时候状态是RUNNING。
分析execute
方法
下面进入正题,执行方法。
首先描述一下方法,这个方法主要分为3步:
- 如果线程数量少于
corePoolSize
,那么我们直接创建一个新的线程来执行任务,通过调用方法addWorker
来检查runstate
与workercount
,如果无法新增线程,那么这个方法将会返回false。 - 如果任务成功进入队列,我们依然需要double-check是否需要增加一个线程(因为可能有一些线程在上次检查之后死亡),或者线程池的状态已经改变。所以通过二次检查让我们来确定是否需要回滚队列或者增加线程。
- 最后如果插入队列失败,我们再次尝试新增线程执行任务,如果无法新增那么应该是在线程池关闭或者饱和了,执行拒绝策略
给出流程图
//这段代码需要着重注意几处调用addWorker的区别,参数的不同,后面分析完addWorker再回头看才会真正发现调用的目的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
对应上面的解释与流程图,这段代码的基本内容十分容易理解。
分析addWorker
在分析addWorker
方法之前,我们先来看ThreadPoolExecutor
类的内部类Worker
。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
当我们执行的时候实际会把Runnable再包装为Worker,通过firstTask。
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
同时,还实现了一个独占锁,这部分代码就是应用了之前讲到的利用AQS实现独占锁的方法。如果不了解可以参照博主的另外两篇文章。
深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码 - 菱灵心 - 博客园
这里不详细讲解,只给出代码。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
下面,我们来看一下其中的重点方法addWorker
。
这段代码主要可以分为两部分,第一部分是检查线程池的状态,如果不满足条件会直接返回false,然后进入死循环等待成功增加线程,如果增加线程成功,那么就可以进入第二部分,真正新增线程,执行任务。具体的逻辑可以参考代码注释。这里给出一个简要的流程图。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//线程池状态
int rs = runStateOf(c);
/**
* 大于等于SHUTDOWN表示线程池已经不应该接受新的任务了,自然应该返回false。
* 但是这里有一个前提就是需要清空已经进入队列的任务。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//线程数
int wc = workerCountOf(c);
/**
* 由于位数限制,线程池有一个CAPACITY,所以超出的不能创建线程了。
* 同时,还有核心线程和最大线程数,一般来说线程池中可以创建的线程数在不会超过最大线程数,
* 这里通过那个控制主要取决于传入的参数,同时也受创建线程池时设置的最大线程数的控制。
* 可以参考上面的线程池分类的部分查看几种线程池的最大线程数。
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 上面的关卡都过去了,终于可以尝试增加线程数了,这里实际是CAS操作,
* 如果不了解,可以到网上搜索,或者参考博主AQS那篇
* 创建成功会直接跳出死循环,进入第二部分,执行任务
*/
if (compareAndIncrementWorkerCount(c))
break retry;
/**
* 重新查看线程池的状态,如果状态改变,那么就需要重新进行上面的状态部分的判断,
* 需要跳出这个等待增加线程数的死循环
*/
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//满足条件就增加成功
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//增加成功就可以执行了,实际调用了Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果没有执行,那么有些内容需要回滚
addWorkerFailed(w);
}
return workerStarted;
}
到这里为止,我们对线程池的基本工作原理有了认识,已经深入分析了源码。但是这里我们还需要提出一个问题,我们将任务加入到队列中后,到底线程池是在什么时候由那个方法将其从队列中取出,进行执行的呢?其实这里流程图中已经提到了,当我们将任务加入队列后,会调用addWorker
方法,但是并没有传入任务,这里实际上会到队列中取任务,那么取任务的代码在哪里呢?实际上是在Worker
类中的run
方法,Worker
类是一个Runnable接口的实现类,在addWorker
方法中调用的t.start();
实际调用了Worker的run方法,下一篇博客中,我们将会介绍这个方法中实现的功能。