前言
线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有如下好处
1、降低资源消耗
2、提高响应速度
3、提高线程的可管理性
java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。
一、ThreadPoolExecutor
ThreadPoolExecutor是线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池。如Executors.newFixedThreadPool方法可以生成一个拥有固定线程池的线程池。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7 Executors.defaultThreadFactory(), defaultHandler); 8 }
本质是通过不同的参数初始化一个ThreadExecutor对象,具体参数描述如下:
corePoolSise:
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime:线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
unit:keepAliveTime的单位;
workQueue:
threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
handler:
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
Executors:
Exectors工厂类提供了线程池的初始化接口,主要有如下几种:
newFixedThreadPool
初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。
newCachedThreadPool:
1、初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
在使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
newSingleThreadExecutor:
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
newScheduledThreadPool:
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
二、实现原理
除了newScheduledThreadPool的内部实现特殊一点之外,其他几个线程池都是基于ThreadPoolExecutor类实现的。
1、线程池内部状态
1 #########ThreadPoolExecutor############ 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 private static final int COUNT_BITS = Integer.SIZE - 3; 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 5 6 // runState is stored in the high-order bits 7 private static final int RUNNING = -1 << COUNT_BITS; 8 private static final int SHUTDOWN = 0 << COUNT_BITS; 9 private static final int STOP = 1 << COUNT_BITS; 10 private static final int TIDYING = 2 << COUNT_BITS; 11 private static final int TERMINATED = 3 << COUNT_BITS; 12 13 // Packing and unpacking ctl 14 private static int runStateOf(int c) { return c & ~CAPACITY; } 15 private static int workerCountOf(int c) { return c & CAPACITY; } 16 private static int ctlOf(int rs, int wc) { return rs | wc; }
其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:
2、SHUTDOWN:
3、STOP :
4、TIDYING :
5、TERMINATED:
1、RUNNING:
-1 << COUNT_BITS
,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;2、SHUTDOWN:
0 << COUNT_BITS
,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;3、STOP :
1 << COUNT_BITS
,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;4、TIDYING :
2 << COUNT_BITS
,即高3位为010;5、TERMINATED:
3 << COUNT_BITS
,即高3位为011;任务提交
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。
1、Executor.execute()
通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方法提交的任务不能获取返回值,因此无法判断任务是否执行成功
1 /** 2 * Executes the given command at some time in the future. The command 3 * may execute in a new thread, in a pooled thread, or in the calling 4 * thread, at the discretion of the {@code Executor} implementation. 5 * 6 * @param command the runnable task 7 * @throws RejectedExecutionException if this task cannot be 8 * accepted for execution 9 * @throws NullPointerException if command is null 10 */ 11 void execute(Runnable command);
通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。
1 /** 2 * Submits a value-returning task for execution and returns a 3 * Future representing the pending results of the task. The 4 * Future's {@code get} method will return the task's result upon 5 * successful completion. 6 * 7 * <p> 8 * If you would like to immediately block waiting 9 * for a task, you can use constructions of the form 10 * {@code result = exec.submit(aCallable).get();} 11 * 12 * <p>Note: The {@link Executors} class includes a set of methods 13 * that can convert some other common closure-like objects, 14 * for example, {@link java.security.PrivilegedAction} to 15 * {@link Callable} form so they can be submitted. 16 * 17 * @param task the task to submit 18 * @param <T> the type of the task's result 19 * @return a Future representing pending completion of the task 20 * @throws RejectedExecutionException if the task cannot be 21 * scheduled for execution 22 * @throws NullPointerException if the task is null 23 */ 24 <T> Future<T> submit(Callable<T> task);
任务实现
当向线程池中提交一个任务,线程池会如何处理该任务
execute实现
具体的执行流程为:
1、workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤2()
2、如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4)
3、再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务
4、执行addWorker方法创建新的线程执行任务,如果addWorker执行失败,则执行reject方法处理任务
addWorker实现
从方法execute的实现可以看出,addWorker主要负责创建新的线程并执行任务,