目录
在Java面试或者是实际工作中经常会遇到我们应该选择多少个线程的问题。本文尝试分析下 在单机多核上运行多少个线程可以达到最大的运行效率。以及为什么不推荐使用Executors创建自带的线程池。
基础知识
线程池核心参数
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 非核心线程存活时间
- TimeUnit 时间类型
- workQueue 缓冲队列
- threadFactory 线程工厂
- handler 拒绝策略
拒绝策略
- AbortPolicy : 直接抛出异常,阻止系统正常运行。
- CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的 任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
- DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再 次提交当前任务。
- DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢 失,这是最好的一种方案。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际
需要,完全可以自己扩展 RejectedExecutionHandler 接口。
阻塞队列
- ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列。
线程池原理
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。无论队列里里是否有任务,都不会马上执行。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,创建新线程执行这个任务。
- 如果正在运行的线程数量大于或等于 corePoolSize,任务放入缓冲队列。
- 如果队列满了,且运行的线程数小于 maximumPoolSize,创建非核心线程立刻运行这个任务。
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,根据拒绝策略处理。
- 当一个线程完成任务时,从队列中取下一个任务来执行。
- 当一个线程没收到新的任务,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运
行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它
最终会收缩到 corePoolSize 的大小。
线程池生命周期
- RUNNING: 能接收新提交的任务,并且可以处理阻塞队列中的任务
- SHUTDOWN: 关闭状态,拒绝接收新的任务提交,会继续处理阻塞队列中保存的任务。
- STOP: 不接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
- TIDYING:所有任务已经终止,有效线程数为0
- TERMINATED:在terminated()方法执行完后进入该状态 | ==> TERMINATED --> 结束
线程池的结构
- 线程池管理器(ThreadPoolManager):用于创建并管理线程池
- 工作线程(WorkThread): 线程池中线程
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
四种线程池
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, //核心线程数
Integer.MAX_VALUE,//最大线程数
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
//不储存元素的阻塞队列
);
}
- SingleThreadPool
- 可以理解为nThreads=1时的FixedThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,//核心线程数
1,//最大线程数
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
//链表实现的有界队列 最大为int.MAX_VALUE
}
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,//最大线程数和核心线程数相等
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
//链表实现的有界队列 最大为int.MAX_VALUE
}
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
submit()和execute()的区别
- 类不同
- submit 属于 ExecutorService
- execute 属于 Executor
- 返回值不同
- execute 无返回值
- submit 可获取返回值future 基于Callable接口获取回调。
线程数如何计算
前提:
- 当前机器上没有其他消耗资源进程,排除干扰项。
- 对于计算密集型任务,线程上下文切换是影响的关键因素。
- 对于IO密集型任务,阻塞时间是影响的关键因素。
- 参考资料为《Java虚拟机并发编程》& 《Java并发编程实战》
计算密集型
-
对于计算密集型任务,我们应将程序线程数限制为 与处理器核心数相同。
- 避免线程的上下文切换。
- 说法来源《Java并发编程实战》第二章
-
对于计算密集型任务,我们应将程序线程数设置为 处理器核心数+1。
- 确保某一个线程暂停时,cup核心不会处于空闲。
- 会导致一次上下文切换。
- 来源于网络热门说法 暂未找到在《Java并发编程实战》书中出处。
IO密集型
方法1:线程数 = Ncpu x Ucpu x (1 + W/C)
- Ncpu : CPU的数量
- Ucpu : 目标CPU的使用率 介于0-1之间
- W/C : 等待时间和计算时间的比值
- 来源《Java并发编程实战》8.2 节 170 页 如下图
方法2:线程数 = Ncpu /(1 - 阻塞系数)
- 阻塞系数为0时候正好是计算密集型 线程数等于cpu核心数即可。
- 来源《Java虚拟机并发编程》第二章 12、27页 如下图
假设方法1&2的目的都是达到cpu的100%利用率可计算得出方法2的阻塞系数。
实际情况分析
- 在实际任务当中,很难对单个线程池的有效数据作出直接的计算。
- 经常会涉及多个实例在在同一台物理机的部署。甚至是多个线程池相互影响。
- 任务当中也有计算密集和IO密集任务的相互影响。
- 拥抱k8s 应用容器化后,实例落在的节点不同,可使用cpu核心数不一定相等。
- 更多的时候是开发人员,基于对业务的估算,以及个人的经验所作出的配置,差异性较大。
为什么不推荐使用Executors的方式创建线程池
- CachedThreadPool:使用的无法加入任务的阻塞缓冲队列,核心线程为0,最大线程数位int.MAX_VALUE 所有任务加入时候都会启动新的线程执行任务,非常容易造成OOM。
- SingleThreadPool & FixedThreadPool:使用LinkedBlockingQueue作为阻塞队列,最大长度为 int.MAX_VALUE。超过核心线程数后的任务都会加入阻塞队列,非常容易带来任务挤压。
所以不建议使用默认的方式创建线程池。
如何动态修改线程池大小
- 感知线程池状态,以及物理机状态。来决定是否需要调整线程池
- 线程池状态:activeCount/maximumPoolSize的比值来定义线程池负载。线程池提供了get方法可以动态的获取相关值。或者根据拒绝策略判断线程池是否处于满负荷运行。
- 物理机状态:cpu负载/内存占用率/磁盘IO 等
- 调整线程池参数
- setCorePoolSize:修改核心线程数
- setMaximumPoolSize: 修改最大线程数
- setRejectedExecutionHandler: 修改拒绝策略。
以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前闲置worker线程发起中断请求以实现回收,多余的worker在下次处于闲置的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务。