本次内容主要介绍线程池的优点、线程池各个参数的解析、线程池的工作原理、线程池的扩展、线程池的合理配置以及JDK预定义5种线程池等内容。
1、为什么要用线程池?
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
(1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
(2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。用一张图说明:
(3)提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。用一张图说明:
2、ThreadPoolExecutor 的类关系
Executor是一个接口,只有一个execute(Runnable command)方法。它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口。
AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
用一张图来说明这几个接口和类之间的关系:
3、创建线程池各个参数含义
Java中创建线程池有2种方式,使用ThreadPoolExecutor类或者使用Executors类,其实这两种方式在本质上是一样的,都是通过ThreadPoolExecutor类的构造函数创建的,我们来分析一个参数最多的构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
(1)corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize。如果当前线程数为corePoolSize,继续提交的任务被保存到队列中,等待被执行。如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
(2)maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
(3)keepAliveTime:线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用。
(4)TimeUnit:keepAliveTime的时间单位。
(5)workQueue:用于保存等待执行任务的阻塞队列,当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行等待。一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
a)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
b)由于a,使用无界队列时maximumPoolSize将是一个无效参数。
c)由于a和b,使用无界队列时keepAliveTime将是一个无效参数。
d)更重要的,使用无界队列可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
所以我们一般会使用:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。
(6)threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,还可以将所有的线程设置为守护线程。Executors静态工厂里默认的ThreadFactory,线程的命名规则是“pool-数字-thread-数字”。
(7)handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
a)AbortPolicy:直接抛出异常,默认策略。
b)CallerRunsPolicy:用调用者所在的线程来执行任务。
c)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务。
d)DiscardPolicy:直接丢弃任务。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。根据以上参数分析我们可以得出一个使用线程池的技巧:如果确定并发量有限,并且每个线程占用的内存大小有限,使用Executors建立线程池;如果并发量没有办法控制,并且每个线程占用的内存大小无法确定较小,那么需要使用ThreadPoolExecutor的方式来创建线程。
4、线程池的工作机制
线程池的工作机制,可以分成4个阶段来说:
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(有界队列已满),则创建新的线程来处理任务。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
用一张图进行说明:
5、扩展线程池
如果我们想扩展线程池的功能怎么办呢?比如在任务执行的前后做一点我们自己的业务工作?实际上,JDK 的线程池已经为我们预留的接口,在线程池核心方法中,有3 个方法是空的,就是预留给我们扩展用的。其中2个方法是beforeExecute(Thread t, Runnable r)和afterExecute(Runnable r, Throwable t),分别在任务执行的前后调用;还有一个是void terminated()方法,在shutdown()方法里面会调用terminated()方法。在创建线程池的时候可以重写这3个方法,示例代码如下:
import java.util.concurrent.*; public class ThreadPoolExtend { static class Worker implements Runnable { private String taskName; public Worker(String taskName) { this.taskName = taskName; } public String getName() { return taskName; } @Override public void run() { System.out.println(taskName + "执行任务"); } } public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(((Worker) r).getName() + "执行前..."); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println(((Worker) r).getName() + "执行后..."); } @Override protected void terminated() { System.out.println("线程池关闭"); } }; for (int i = 0; i <= 3; i++) { Worker worker = new Worker("worker" + i); threadPool.execute(worker); } threadPool.shutdown(); } }
从程序输出可以看到,通过重写这3个方法顺利的完成了线程池的扩展功能。
6、提交任务
Java线程池中有2种方式提交任务:execute()和submit(),主要区别如下:
(1)execute提交的是Runnable类型的任务,而submit提交的是Callable或者Runnable类型的任务。
(2)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit()方法用于提交需要返回值的任务,线程池会返回一个Future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
(3)execute提交的时候,如果有异常,就会直接抛出异常,而submit在遇到异常的时候,通常不会立马抛出异常,而是会将异常暂时存储起来,等待你调用Future.get()方法的时候,才会抛出异常。
7、关闭线程池
可以通过调用线程池的shutdown()或shutdownNow()方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow()首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown()只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。只要调用了这两个关闭方法中的任意一个,isShutdown()方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed()方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown()方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow()方法。
8、合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:
(1)任务的性质:CPU密集型任务(大部份时间用来做计算、逻辑判断等操作)、IO密集型任务(大部分的状况是CPU在等I/O 的读/写操作)和混合型任务。
(2)任务的优先级:高、中和低。
(3)任务的执行时间:长、中和短。
(4)任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。先来分析(1)任务的性质:
a)CPU密集型任务应配置尽可能小的线程,如配置服务器CPU个数+1个线程的线程池。服务器CPU个数可以通过Runtime.getRuntime().availableProcessors()方法获得。
b)由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如服务器CPU个数的2倍。对于IO型的任务的最佳线程数,有个公式可以计算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
NCPU是处理器的核的数目
UCPU是期望的CPU利用率(该值应该介于0和1之间)
W/C是等待时间与计算时间的比率
等待时间与计算时间我们在Linux下使用相关的vmstat命令或者top命令查看。
c)对于混合型任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。
分析(2)任务的优先级:
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行。
分析(3)任务的执行时间:
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
分析(4)任务的依赖性:
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
最后,不管从哪个角度来分析,都建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如5000。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
9、JDK预定义5种线程池
在JDK1.8中,已经预定义了5种线程池,除了WorkStealingPool是JDK1.8新加入的,其余4个在JDK1.5就有了。5种预定义线程池的使用都非常简单,就不一一用代码演示了,下面分别简单的介绍:
9.1 FixedThreadPool
创建使用固定线程数的FixedThreadPool的API。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。FixedThreadPool使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
9.2 SingleThreadExecutor
创建使用单个线程的SingleThreadExecutor的API,于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
9.3 CachedThreadPool
创建一个会根据需要创建新线程的CachedThreadPool的API。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。FixedThreadPool和SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
9.4 WorkStealingPool
JDK1.8加入的,利用所有运行的处理器数目来创建一个工作窃取的线程池,使用forkjoin实现,如果对forkjoin不熟悉,可以去看一下之前的《java线程并发工具类》。看下WorkStealingPool源码:
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
9.5 ScheduledThreadPoolExecutor
Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下:
(1)ScheduledThreadPoolExecutor,包含若干个线程的ScheduledThreadPoolExecutor,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
(2)SingleThreadScheduledExecutor,只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
10、结语
本次就分享这么多内容,希望大家看了有收获。下一篇内容中会介绍Java并发安全相关知识点,阅读过程中如发现描述有误,请指出,谢谢。