老经验,先学习源码中对类的注释,注释懂了,大概怎么工作的的雏形就差不多清晰了
线程池是什么
ThreadPoolExecutor是一个ExecutorService,使用池化的线程来执行任务,通用使用Executors类中的"工厂方法"来进行配置
ps:这个Executors类的定位就像集合框架里的Collections,es各种Builder中的QueryBuilders类,像一个辅助包,有很多"工厂方法"
来输出产品
线程池的定位:解决2个问题
1、在执行大量异步任务时,通过减少每个任务的调用开销(重复创建销毁线程等),提高了性能
2、提供了可以管理并限制资源的方法,这些资源包括线程(限制线程池的大小,队列大小等等),每个线程池都维护了一些基础的统计信息,例如完成的任务数。
可配置的参数和hooks
1、线程池的core size和max size:
线程池可以通过core size和max size动态调节池的大小
当通过 execute(Runnable)
提交新任务时,
如果worker threads < corePoolSize,线程池会新建一个新线程来处理这个新任务,即使这些工作线程处于空闲的状态(也就是没有处理任务)
如果corePoolSize< worker threads < maximumPoolSize,只有当queue满了,才会创建新的线程(否则都会丢入queue排队)
如果设置corePoolSize=maximumPoolSize,相当于创建了一个固定大小的线程池,等价于这个:newFixedThreadPool(int nThreads)
如果设置maximumPoolSize=Integer.MAX_VALUE
,那么这个线程池可以同时处理任意多个任务
通常情况下,核心数和最大线程数在初始化的时候就设置好。当然,也可以通过setCorePoolSize
和setMaximumPoolSize
方法动态更改
2、按需构造线程池
默认情况下,当新任务到达时,工作线程才会被创建并启动。
但是也可以通过prestartCoreThread
或者prestartAllCoreThreads
方法预先让core size个线程提前创建
当创建的线程池带有一个非空的任务队列时,则需要预启动若干个线程(prestart threads)
3 、创建新线程
新线程由ThreadFactory
创建,如果未指定ThreadFactory,线程池将使用Executors.defaultThreadFactory作为默认的线程工厂
。
通过默认工厂创建的线程都在同一个threadGroup里,他们拥有同样的优先级(NORM_PRIORITY),且都是非守护线程。
如果提供的是其他的ThreadFactory,则可以修改线程的名字,线程组,优先级,守护状态等
当工厂新建线程失败,池会继续运行,但是可能无法处理任何任务,后面的是权限方面的问题,没看
4 、Keep-alive 时间
如果当前线程池的线程数量 > corePoolSize,对于这些多出来的线程,如果空闲时间超过keep-alive设置的时间,将会被终止
这种方式可以在线程池没有太多任务的时候,降低线程资源的消耗。
keep-alive time可以通过setKeepAliveTime(TimeUnit)
方法动态更改
如果空闲时长设置为Long.Max_VALUE,
那么空闲线程将永远不会被终止。
默认情况下,只有线程数超过core size, 超时策略才会使用到。但allowCoreThreadTimeOut(boolean)
方法也可以让超时策略用在core threads上
5 、入队
任何的阻塞队列可以传递和保存任务,但是具体策略和当前线程池大小有关:
如果线程数 < core,线程池会创建新线程来处理新任务,任务不会入队
如果线程数 >= core,新任务会入队,而不是创建新线程
如果新任务无法入队(例如满了),并且线程数 < max,那么会新建线程处理任务。如果已经=max(那么创建一个线程后肯定会超过max),那么任务会被拒绝
线程池有以下三个入队策略(就是三种BlockingQueue选择)
- 直接传递:
一个很不错的默认选择,同步队列SynchronousQueue.它会在任务入队后,立刻将任务转给线程处理,而不保留任务。
如果没有可用的线程(没法新建更多的线程)来立即处理新任务,那么会入队失败
这个策略可以避免任务被锁住,有点像生产者-消费者模式
- 无界队列:
例如没有预设容量的LinkedBlockingQueue,使用无界队列将会导致当所有的核心线程都在运行任务时,新任务全部要入队阻塞
这样的话,永远只有coreSize个线程执行任务,不会有更多的线程被创建,maxSize这个参数将不会起任何作用
当任务彼此不相互依赖时,这是一个很好的做法,比如一个web服务器,服务处理速度慢于外部请求速度,但是仍然有爆发式的请求不断到来
- 有界队列:
有界队列(例如 ArrayBlockingQueue)可以通过设定maxSize来保护资源,但同时也更难协调和控制。队列的长度和池的大小需要相互协调
长队列和小(线程池)池的组合减少了CPU的使用,OS 资源和上下文切换带来的损耗,但是可能会人为地降低吞吐量。
如果任务经常阻塞(例如I/O密集型任务),系统可以为更多的线程安排时间,可能比你设定的线程数还要多(没有充分利用CPU)
短队列通常需要和大(线程)池搭配使用,它们能充分利用CPU,但是也可能会带来不可预计的调度开销,因而降低吞吐量
6 、拒绝任务
当线程池shutdown之后,或者没有shutdown,但是队列容量有限且线程池饱和运行(达到最大线程且均在运行任务)时,通过execute(Runnable)
方法提交的任务会被拒绝
此时execute
方法会调用RejectedExecutionHandler.rejectedExecution(Runnable,ThreadPoolExecutor)
方法
RejectedExecutionHandler
是一个接口,每个线程池的RejectedExecutionHandler
变量不一样,该接口有四种具体实现:
ThreadPoolExecutor.AbortPolicy
(默认):拒绝新任务,并抛出RejectedExecutionException
异常。
ThreadPoolExecutor.CallerRunsPolicy
: 调用execute方法的线程本身来执行这个任务。这种做法提供了一个简易的反馈控制机制降低新任务的提交频率
ThreadPoolExecutor.DiscardPolicy
:直接丢弃
ThreadPoolExecutor.DiscardOldestPolicy
:线程池正常运行的情况下,dropped掉队头的任务,然后重试 execute,重试也可能会再次失败,然后会再次dropped掉一个处于
head位置的任务
7 、钩子方法
可以理解为callback方法,或spring里的aop的切面函数
ThreadPoolExecutor
类提供 beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
} 两个可被覆盖的钩子函数
它们在任务的开始和结束的时候被调用。可被用于配置运行环境,例如更改ThreadLocals
,收集统计信息,或者加日志。
此外,terminated()
方法也可以被覆盖,在线程池完全终止的时候,可以通过这个方法做一些特殊的处理。
如果钩子方法抛出异常,内部的工作线程可能会逐个失败直至线程池终止
8 、队列维护
getQueue()
可以获取队列来监控和调试,但是强烈不建议使用这个方法来干别的事情。
当大量的入队任务被取消时,remove(Runnable)
和 purge
方法可以帮助来回收空间
9 、回收
当一个线程池不再被其他程序引用,并且池中没有线程的时候,会自动shutdown
如果你希望一个不再被引用的线程池可以被自动回收(不是手动使用shutdown方法),那么你必须确保空闲线程会自动停止
可以通过设置合适的keep-alive time,core size设为0,并且调用allowCoreThreadTimeOut()
方法来达到目的
附:线程池的生命周期
线程池的状态贯穿了线程池的整个生命周期,有以下5个生命周期:
RUNNING: 接收新任务,处理队列的任务。SHUTDOWN:不接收新任务,继续处理队列的任务。
STOP:不接收新任务,也不处理队列里的任务,并尝试停止正在运行的任务。
TIDYING:所有的任务都终止了,线程数为0之后,线程池状态会过度到TIDYING,然后执行terminated()钩子方法。
TERMINATED:在terminated()方法执行完之后,线程池状态就会变成TERTMINATED。