Executors是一个线程池的工厂类,提供各种有用的线程池的创建,使用得当,将会使我们并发编程变得简单!今天就来聊聊这个工厂类的艺术吧!
Executors只是Executor框架的主要成员组件之一,为java的异步任务调度执行提供了重要的入口!
在说Executors之前,还需要说一下另一个Executor框架的重要成员,ThreadPoolExecutor。
ThreadPoolExecutor 实现了ExecutorService接口,提供了线程池的调度功能!成为纯种池技术的基石!
ThreadPoolExecutor 的主要构造函数:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
咱们此处不是要去分析其源码,只是想看一下怎样使用他!
从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:
corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限 keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位 workQueue: 阻塞队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费; ArrayBlockingQueue: 构造函数一定要传大小 LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。 SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。 PriorityBlockingQueue: 优先队列 threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用 handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略 AbortPolicy: 直接抛弃(默认) CallerRunsPolicy: 用调用者的线程执行任务 DiscardOldestPolicy: 抛弃队列中最久的任务 DiscardPolicy: 抛弃当前任务
所以,虽然参数有点多,但是其实仔细阅读以上的注释,基本就能很好地使用线程池了,不过咱们还可以通过一个流程图来说明这一切,对于喜欢看图说话的同学来说是件好事!
好了,ThreadPoolExecutor 介绍完后,什么感觉呢? 说是简单,其实还是挺麻烦的,毕竟你还要去了解各种队列的公优缺点,去了解线程池范围怎样设置合理。所以,是时候让 Executors 工厂出场了!
作为一个工厂类,Executors 简化了各种参数,只忘文生义即可明白其意思!Executors 主要提供4种类型的线程池!
1. FixedThreadPool 创建一个指定数量的线程池
可控制线程最大并发数,超出的线程会在队列中等待。
其实现方式如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
使用固定线程数的worker和无界队列保存多余的task,所以创建的线程数不会超过corePoolSize,所以maxPoolSize是一个无效参数,所以keepAliveTime是一个无效参数,所以不会调用RejectedExecutionHandler.rejectedExecution()方法,即无拒绝策略可用;从而在外部表现来看,就是固定线程,在执行无限的队列!
2. SingleThreadExecutor 创建一个单线程化的线程池
它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
其实现方式如下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
与 FixedThreadPool 原理类似,不过这里只会使用一个线程在运行。使用一个线程可以保证取任务时的顺序一致性,从而表现出先到先得的效果!在要求有执行顺序要求的场景,刚好派上用场!
3. CachedThreadPool, 创建一个可缓存的无限线程池
如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
其实现方式如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用SyncronousQueue作为线程池的工作队列,这是特殊的队列,它是个没容量的阻塞队列(至于为什么要用这个特性,据说是性能比 LinkedBlockingQueue 更高,详解他),每个插入操作必须有一个线程对应的移除操作,反之一样;当提交一个cached任务时,执行SyncronousQueue.offer(x),如果有等待的空闲线程,则匹配成功,exec立即返回;如果没有匹配,则会创建一个新线程执行任务;任务执行完后使用poll()等待keep即60秒,一直尝试从队列中获取任务,超时后自动释放线程;
使用这种线程池把握好节奏,因为看起来线程池动不动就会创建系列线程池,而且动不动就会释放线程,完全是不可控的,不可监控的。
4. ScheduledThreadPool 创建一个定长线程池
支持定时及周期性任务执行。
其实现如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
看起来,这个在参数方面没有太多的解说,只是依赖于 ScheduledThreadPoolExecutor 的实现了!所以,我们来看一下 ScheduledThreadPoolExecutor是怎么实现的?
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor {} public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ok, 如上两个,就够了!ScheduledThreadPoolExecutor 也是继承了 ThreadPoolExecutor, 其构造方法也是依赖于父类,只是将 队列实现改为 DelayedWorkQueue,从而实现延时运行或者定期执行的效果!
而相比于 Timer 来说,ScheduledThreadPool会有更好的执行效果,因为Timer只会有一个线程来执行任务,会有局限性。而且 ScheduledThreadPool 提供其他很多的可操作方法!
Executors 的出现,使线程池的使用难度大大降低,使开发更方便,在合适的场合使用相应的工厂方法,定能让开发事半功倍!