面向对象编程中,对象创建和销毁是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是对一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些”池化资源”技术产生的原因。比如大家所熟悉的数据库连接池就是遵循这一思想而产生的,下面将介绍的线程池技术同样符合这一思想。
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
假设一台服务器完成一项任务的时间为T
T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程间同步所需时间
T3 线程销毁的时间
显然T = T1+T2+T3。注意这是一个极度简化的假设。
可以看出T1,T3是多线程本身附加的开销,用户希望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在应用程序中频繁的创建或销毁线程,这导致T1和T3在T中占有非常大的比例。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1、T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1、T3的开销了,线程池不仅调整T1、T3产生的时间,而且它还显著减少了创建线程的数目。在看一个例子:
假设一台服务器每天大约要处理100000个请求,并且每个请求需要一个单独的线程完成,这是一个很常用的场景。在线程池中,线程数量一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限,而如果服务器不利用线程池来处理这些请求则线程总数为100000。一般线程池尺寸是远小于100000。所以利用线程池的服务器程序不会为了创建100000而在处理请求时浪费时间,从而提高效率。
线程池是一种多线程处理方法,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程,每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程处于空闲状态,则线程池将会调度一个任务给它,如果所有线程都始终保持繁忙,但将任务放入到一个队列中,则线程池将在一段时间后创建另一个辅助线程,但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
线程池主要有如下几个应用范围:
-
需要大量的线程来完成任务,且完成任务的时间比较短,如WEB服务器完成网页请求这样的任务。因为单个任务小,而任务数量巨大,比如一个热门网站的点击次数。 但对于长时间的任务,比如一个ftp连接请求,线程池的优点就不明显了。因为ftp会话时间相对于线程的创建时间长多了。
-
对性能要求苛刻的应用,比如要求服务器迅速相应客户请求。
-
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限。
下面将讨论线程池的简单实现,以说明线程技术优点及应用领域。
线程池的简单实现
一般一个简单线程池至少包含下列组成部分。
- 线程池管理器(ThreadPoolManager):用于创建并管理线程池。
- 工作线程(WorkThread): 线程池中线程。
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
线程池管理器至少有下列功能:创建线程池、销毁线程池、添加新任务
创建线程池的部分代码如下:
public class ThreadPoolManager {
private int threadCount; //启动的线程数
private WorkThread[] handlers; //线程数组
private ArrayList<Runnable> taskVector = new ArrayList<Runnable>(); //任务队列
ThreadPoolManager(int threadCount) {
this.threadCount = threadCount;
for (int i = 0; i < threadCount; i++) {
handlers[i] = new WorkThread();
handlers[i].start();
}
}
void shutdown() {
synchronized (taskVector) {
while (!taskVector.isEmpty())
taskVector.remove(0); //清空任务队列
}
for (int i = 0; i < threadCount; i++) {
handlers[i] = new WorkThread();
handlers[i].interrupt(); //结束线程
}
}
void execute(Runnable newTask) { //增加新任务
synchronized (taskVector) {
taskVector.add(newTask);
taskVector.notifyAll();
}
}
private class WorkThread extends Thread {
public void run() {
Runnable task = null;
for (;;) {
synchronized (taskVector) {//获取一个新任务
if (taskVector.isEmpty())
try {
taskVector.wait();
task = taskVector.remove(0);
} catch (InterruptedException e) {
break;
}
}
task.run();
}
}
}
}
ThreadPoolManager构造函数允许用户设置启动的线程数量,并且需要的创建线程。Shutdown函数主要关闭打开的线程和清空还没有执行的任务,execute函数将任务加入到工作队列,并且唤醒等待的线程。WorkThread就是实际的工作线程,工作线程是一个可以循环执行任务的线程,在没有任务时将等待,当有任务时,会被唤醒。任务接口是为所有任务提供统一的接口,以便工作线程处理,在这里我们采用java定义的Runnable接口,用户可以实现这个接口来完成想要的事务。实现一个线程池需要了解线程的同步机制,这部分将在后面介绍。
Java自带线程池
自从Java1.5之后,Java 提供了自己的线程池ThreadPoolExecutor和ScheduledThreadPoolExecutor,我们先看类之间的结构图。
关于线程池的主要类有如下几部分:
接口:Executor、ExecutorService、ScheduledExecutorService
类:Executors、AbstractExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor。
ThreadPoolExecutor
首先看看ThreadPoolExecutor的构造函数,ThreadPoolExecutor提供了几个构造函数,我们先来参数最全构造函数的含义。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程池维护线程的最少数量
- maximumPoolSize:线程池维护线程的最大数量
- keepAliveTime:线程池维护线程所允许的空闲时间 ,所以如果任务很多,并且每个任务执行的时间比较短,可以适当调大这个参数来提高线程的利用率。
- unit: keepAliveTime 参数的单位,可选的单位:天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS)和纳秒(NANOSECONDS)
- workQueue:任务队列,用来存放我们所定义的任务处理线程,BlockingQueue是一种带锁的阻塞队列,我们将在后面专门讲解这种数据结构,BlockingQueue有四种选择:
(1)ArrayBlockingQueue,是一种基于数组的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行操作;
(2)LinkedBlockingQueue,是一个基于链表的阻塞队列,此队列也按FIFO (先进先出)对元素进行操作,吞吐量通常要高于ArrayBlockingQueue, Executors.newFixedThreadPool()使用了这种队列;
(3)SynchronousQueue;是一种不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,Executors.newCachedThreadPool使用了这个队列;
(4)PriorityBlockingQueue,是一种具有优先权的阻塞队列,优先级大的任务可以先执行,用户由此可以控制任务的执行顺序。这四种阻塞队列都有自己的使用场景,用户可以根据需要自己决定使用。 - threadFactory:创建新线程时使用的工厂,threadFactory有两种选择:
(1)DefaultThreadFactory,将创建一个同线程组且默认优先级的线程;
(2)PrivilegedThreadFactory,使用访问权限创建一个权限控制的线程。ThreadPoolExecutor默认采用DefaultThreadFactory。 - handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理策略,handler有四个选择:
(1)ThreadPoolExecutor.AbortPolicy(),将抛出RejectedExecutionException异常;
(2)ThreadPoolExecutor.CallerRunsPolicy(),将重试添加当前的任务,重复调用execute()方法;
(3)ThreadPoolExecutor.DiscardOldestPolicy(),将抛弃旧任务;
(4)ThreadPoolExecutor.DiscardPolicy,将直接抛弃任务。ThreadPoolExecutor默认采用AbortPolicy。
一个任务通过execute(Runnable)方法被添加到线程池,任务必须是一个 Runnable类型的对象,任务的执行方法就是调用Runnable类型对象的run()方法。当一个任务通过execute(Runnable)方法欲添加到线程池时,会做一下几步:
- 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- 如果此时线程池中的数量大于等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理添加的任务。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
- 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
读者可以参考下面的源代码,分析execute函数执行的流程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // 执行handler策略
}
}
当数量少于corePoolSize时的主要流程:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //创建新线程
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
当数量大于corePoolSize,小于maximumPoolSize,且阻塞队列不能存储任务时,执行的主要流程:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
如果想在多线程环境中定期执行去执行任务,或者做一些其他事情,用户可以通过Timer来实现,但是Timer有几种缺陷:
- Timer是基于绝对时间的,容易受系统时钟的影响;
- Timer只新建了一个线程来执行所有的TimeTask,所有TimeTask可能会相关影响;
- Timer不会捕获TimerTask的异常,只是简单地停止。
这样势必会影响其他TimeTask的执行。JDK提供了一种定时功能的线程池:ScheduledThreadPoolExecutor,它继承了ThreadPoolExecutor,并且实现了ScheduledExecutorService接口,此接口有如下几个方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
创建并执行在给定延迟后启用的一次性操作:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
创建并执行在给定延迟后启用的一次性操作:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是经过period 后开始执行,即在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。
ScheduledThreadPoolExecutor也提供了几个构造函数,下面列出的是其中最简单的一个,只有corePoolSize一个参数。ScheduledThreadPoolExecutor的构造函数仅做的一件事就是调用ThreadPoolExecutor的构造函数,它使用一种带有延时标记的等待队列DelayedWorkQueue。DelayedWorkQueue内部使用concurrent包里的DelayQueue,DelayQueue是一个无界的BlockingQueue,用于放置延时Delayed接口的对象,对象只能在其到期时才能从队列中取走,我们将在专门讲解这种数据结构。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
要配置一个线程池相对比较复杂,需要了解相关的参数,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池:
public static ExecutorService newSingleThreadExecutor()
创建仅有一个线程工作的线程池,相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么将创建有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
public static ExecutorService newCachedThreadPool()
创建一个缓存线程池,如果线程池的大小超过了任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又动态添加新线程来处理任务。此线程池没有对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)所能够创建的最大线程大小。
public static ExecutorService newFixedThreadPool(int nThreads)
创建指定大小的线程池。每次提交一个任务就创建一个线程,直到线程数量达到线程池的最大值。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
类似于newCachedThreadPool,创建一个缓存线程池,此线程池还支持定时以及周期性执行任务。
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
类似于newSingleThreadExecutor,创建一个单线程的线程池,此线程池还支持定时以及周期性执行任务。
下面用两个例子介绍线程池的使用方法,第一个例子会创建一个固定大小的线程池,第二个例子会创建基于时间线程池。
第一个例子
ExecutorService pool = Executors.newFixedThreadPool(2);
//创建四个任务
Thread t1 = new Task1();
Thread t2 = new Task2();
Thread t3 = new Task3();
Thread t4 = new Task4();
//放入线程池
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.shutdown(); //关闭线程池
第二个例子:
ExecutorService pool = Executors.newScheduledThreadPool(4);
Thread t = new Task();
pool.scheduleAtFixedRate(t,1, 5, TimeUnit.SECONDS);
总结:
- FixedThreadPool是一个典型且优秀的线程池,它具有线程池的高效率和节省创建线程时所耗的开销的优点。但是在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
- CachedThreadPool的特点就是在线程池空闲时,即线程池中没有可运行任务时,它会释放工作线程,从而释放工作线程所占用的资源。但是当出现新任务时,又要创建新的工作线程,这会带来一定的系统开销。并且在使用CachedThreadPool时,一定要注意控制任务的数量,否则大量线程同时运行,可能会造成系统瘫痪。