1. 结构概述
1.1 整体结构
1.2 分层描述
1.2.1 Executor(顶级接口)
定义执行任务的方法;
执行已提交的 Runnable
任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程
1.2.2 ExecutorService(子接口)
定义停止任务,提交任务的方法;
提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future
的方法。
1.2.3 AbstractExecutorService(实现接口的抽象类)
提供 ExecutorService
执行方法的默认实现。
1.2.4 ThreadPoolExecutor(线程池具体的实现类)
定义创建线程池的具体方法及其他细节。
一个 ExecutorService
,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors
工厂方法配置。
线程池可以解决两个不同问题:
由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能;
并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
1.2.5 Executors(使用线程池的工具类)
定义了线程池的一些实用方法和工厂方法;
2. 基础理论(生产者+消费者)
2.1 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 核心线程数。 // 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, // 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; this.corePoolSize = corePoolSize; // 最大线程数,它表示在线程池中最多能创建多少个线程; this.maximumPoolSize = maximumPoolSize; // 一个阻塞队列,用来存储等待执行的任务。 this.workQueue = workQueue; // 表示线程空闲时最多存活多长时间。 // allowCoreThreadTimeOut为false时,keepAliveTime参数对核心线程不起作用 // allowCoreThreadTimeOut为true时,keepAliveTime参数对所有线程(包括核心线程)都起作用 this.keepAliveTime = unit.toNanos(keepAliveTime); // 线程工厂,主要用来创建线程; this.threadFactory = threadFactory; // 表示当拒绝处理任务时的策略 this.handler = handler; }
2.2 基本属性
private final BlockingQueue<Runnable> workQueue; // 任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); // 线程池的主要状态锁,对线程池状态(比如线程池大小runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); // 用来存放工作集 private volatile long keepAliveTime; // 线程存活时间 private volatile boolean allowCoreThreadTimeOut; // 是否允许为核心线程设置存活时间 private volatile int corePoolSize; // 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; // 线程池最大能容忍的线程数 private volatile int poolSize; // 线程池中当前的线程数 private volatile RejectedExecutionHandler handler; // 任务拒绝策略 private volatile ThreadFactory threadFactory; // 线程工厂,用来创建线程 private int largestPoolSize; // 用来记录线程池中曾经出现过的最大线程数 private long completedTaskCount; // 用来记录已经执行完毕的任务个数
2.3 线程池状态
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
2.4 线程池初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
2.5 任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
2.6 任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy :丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy :也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy :丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy :由调用线程处理该任务
2.7 线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务