网上的资料大同小异,笔者总结了几篇文章做了个总结,欢迎读者指正;写文章的目的也在于提升自己。
1.1 线程池 Thread Pool
1.1.2 概念:线程池是一种多线程处理形式,处理任务过程中会将任务添加到队列,然后在创建线程后自动启动这些任务。线程池的线程都是后台的线程。每个线程都使用了默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(比如说:正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持忙碌。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作。则线程池将在一段时间后创建另一个辅助线程但线程的树木永远不会超过最大值。超过最大值的线程可以排队,但它们要等到其他线程完成后才启动。在传统的开发中线程过多会带来调度开销,进而影响缓存局部性和系统整体性能。而线程池中维护着固定的多个线程,等待监督管理者分配可并发执行的任务,避免了处理短时间任务时创建和销毁线程的的代价。线程池不仅能够保证CPU内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。例如,线程数一般取CPU数量+2即可,线程过多会导致额外的的线程切换开销。
1.1.3 技术背景:在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其他更多资源。在Java中更是如此,JVM(虚拟机)试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能的减少创建对象和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些“池化资源”技术产生的原因。比如大家所熟悉的数据库连接池就是遵循这一池化资源思想产生的。
1.1.4 应用范围:
适用:
1、需要大量的线程来完成任务,且完成任务的时间比较短。比如Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的,因为单个任务小,且任务数量巨大,可以想象一个热门网站的爆款商品点击次数。
2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3、突发性的大量请求,但是不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存达到极限,并出现“OutOfMemory”的错误。
不适用:
1、对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了!
1.2 线程池的模式
线程池的模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
一、半同步/半异步模式:又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程(Work Thread)进入挂起状态。由于线程间有数据通信,因此不适用于大数据量交换的场合。
二、领导者/跟随者模式:在线程池中可以处在3种状态之一:领导者(leader)、追随者(follower)、工作者(processor)。任何时刻线程池只有一个领导者进程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态设置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。
1.2.1 线程的伸缩性对系统性能的影响
一、创建太多线程,将会浪费一定的资源,有些线程未被充分使用。
二、销毁太多线程,将导致之后浪费时间再次创建它们。
三、创建线程太慢,将会导致长时间的等待,性能变差。
四、销毁线程太慢,导致其他线程资源饥饿。
1.2.2 线程池优化
一、线程池管理器(ThreadPoolManager):用于管理和创建线程池
二、工作线程(WorkThread):线程池中的线程,就是上面所说完成工作的线程。
三、任务接口(TaskInterface):每个任务必须实现的接口,以供工作线程调度任务的执行。
四、任务队列(TaskQueue):用于存放没有处理的任务。提供一种缓存机制
2.2 Java中的线程池
2.2.1 线程池的路径
Java的线程池在java.util.concurrent类下,具体类名为ThreadPoolExecutor,也是Java线程池中最核心的一个类。
2.2.2 线程池源码分析
public class ThreadPoolExecutor extends AbstractExecutorService{ .... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
从上面的代码可以看得出来,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上观察每个构造器的源码具体实现,其实前面三个构造器都是为了调用第四个构造器进行的初始化工作。
其中ThreadPoolExecutor的参数:
1、corePoolSize:核心池的大小,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从方法名上既可以看出来,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者1个线程。当线程池中的线程数达到了最大的核心线程池大小后,也就是说所有的核心线程都在工作中!那么就会把到达的任务放到缓存队列当中;
2、maximumPoolSize:线程池最大线程数,这个参数很重要,即代表在线程池中最大能创建多少个线程;
3、keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程池数大于corePoolSize时,keepAliveTime才会起作用。
4、workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。
5、threadFactory(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式。threadFactory创建的名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。
6、handler(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略
2.3 如何配置线程池
可以根据任务的类型具体划分为三类:
一、CPU密集型任务
尽量使用较小的线程池,一般为CPU的核数+1或+2。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。
二、I/O密集型任务
可以使用稍大的线程池,一般为2*CPU核数。I/O密集型任务CPU使用率并不高,因此可以让CPU在等待I/O的时候让其他线程去处理别的任务,充分利用CPU时间。
三、混合型任务
可以将任务划分为I/O密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行的任务,而且还有加上任务拆分与合并的开销,得不偿失。
2.4 Java中提供的线程池
Executors类提供了4中不同的线程池
1、newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换),非核心线程的数量为Integer.max_value,就是无限大,没有需要时回收线程。
2、newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)
3、newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
4、newScheduledThreadPool:按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大,适用于执行延时或者周期性任务。
2.4.2 线程池中的方法
execute()和submit()方法
1、execute():执行一个任务,没有返回值。
2、submit():提交一个线程任务,有返回值。
submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用(IntentService中有体现)
submit(Runnable task,T result)能通过传入的载体result间接获得线程的返回值。
submit(Runnable task)则没有返回值的,就算获取它的返回值也null。
Future.get方法会使得取结果的线程进入阻塞状态,直到线程执行完成之后,唤醒取结果的线程,然后返回结果。
2.5 线程池的主要处理流程
有图我们可以看出,任务进来时,首先执行判断,判断核心线程是否处于空闲状态,如果不是,核心线程就先就执行任务,如果核心线程已满,则判断任务队列是否有地方存放该任务,若果有,就将任务保存在任务队列中,等待执行,如果满了,在判断最大可容纳的线程数,如果没有超出这个数量,就开创非核心线程执行任务,如果超出了,就调用handler实现拒绝策略。
2.6 handler的拒绝策略有四种:
第一种AbortPolicy:不执行新任务,直接抛出异常,提示线程池已满
第二种DisCardPolicy:不执行新任务,也不抛出异常
第三种DisCardOldSetPolicy:将消息队列中的第一个任务替换为当前新进来的任务执行
第四种CallerRunsPolicy:直接调用execute来执行当前任务
欢迎Java的朋友关注博主的微信公众号