前言
Executors
Executors 是一个Java中的工具类。提供工厂方法来创建不同类型的线程池。
常用方法:
1.newSingleThreadExecutor
介绍:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
优点:单线程的线程池,保证线程的顺序执行
缺点:不适合并发
2.newFixedThreadPool
介绍:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
优点:固定大小线程池,超出的线程会在队列中等待
缺点:不支持自定义拒绝策略,大小固定,难以扩展
3.newCachedThreadPool
介绍:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
优点:很灵活,弹性的线程池线程管理,用多少线程给多大的线程池,不用后及时回收,用则新建
缺点:一旦线程无限增长,会导致内存溢出
4.newScheduledThreadPool
介绍:创建一个定长线程池,支持定时及周期性任务执行
优点:一个固定大小线程池,可以定时或周期性的执行任务
缺点:任务是单线程方式执行,一旦一个任务失败其他任务也受影响
总结
1)以上线程池都不支持自定义拒绝策略。
2)newFixedThreadPool 和 newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
3)newCachedThreadPool 和 newScheduledThreadPool:
主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
ThreadPoolExecutor
阿里巴巴的JAVA开发手册推荐用ThreadPoolExecutor创建线程池。集以上优点于一身。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
参数解释:
corePoolSize : 线程池核心池的大小。
maximumPoolSize : 线程池的最大线程数。
keepAliveTime : 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit : keepAliveTime 的时间单位。
workQueue : 用来储存等待执行任务的队列。
threadFactory : 线程工厂。
handler 拒绝策略。
原理:
有请求时,创建线程执行任务,当线程数量等于corePoolSize时,请求加入阻塞队列里,当队列满了时,接着创建线程,线程数等于maximumPoolSize。 当任务处理不过来的时候,线程池开始执行拒绝策略。
阻塞队列:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
拒绝策略:
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务。(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
Demo
1 package com.xxx; 2 3 import com.google.common.util.concurrent.ThreadFactoryBuilder; 4 5 import java.util.concurrent.*; 6 7 /** 8 * 线程池 9 * @author xhq 10 */ 11 public class ThreadPoolService { 12 13 /** 14 * 自定义线程名称,方便的出错的时候溯源 15 */ 16 private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build(); 17 18 /** 19 * corePoolSize 线程池核心池的大小 20 * maximumPoolSize 线程池中允许的最大线程数量 21 * keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间 22 * unit keepAliveTime 的时间单位 23 * workQueue 用来储存等待执行任务的队列 24 * threadFactory 创建线程的工厂类 25 * handler 拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理 26 */ 27 private static ExecutorService service = new ThreadPoolExecutor( 28 4, 29 40, 30 0L, 31 TimeUnit.MILLISECONDS, 32 new LinkedBlockingQueue<>(1024), 33 namedThreadFactory, 34 new ThreadPoolExecutor.AbortPolicy() 35 ); 36 37 /** 38 * 获取线程池 39 * @return 线程池 40 */ 41 public static ExecutorService getEs() { 42 return service; 43 } 44 45 /** 46 * 使用线程池创建线程并异步执行任务 47 * @param r 任务 48 */ 49 public static void newTask(Runnable r) { 50 service.execute(r); 51 } 52 }