在Java中,我们如果想同时做多件事情,则需要将不同事情以任务的形式抽象出来(即实现了Runnable接口的类),将不同的任务交给线程来驱动,以完成同时执行多件事情的效果。创建任务很容易,new一个类就可以了,但是要跑起来还需要线程啊,线程可是稀缺资源啊,怎么获取呢?
前面在Java线程机制一文中我们简单介绍了线程创建的几种方法,但这只是作为学习使用的,在生产环境中一般是不会直接通过新建线程来获取线程资源的。因为Java中的线程是和操作系统底层的线程挂钩的,创建线程是一个很消耗时间和资源的事情,如果频繁创建和销毁线程就可能会导致资源耗尽;而且如果创建了大量线程,也会导致线程之间的频繁切换,这也是很耗时间的操作。因此,JDK中提供了线程池来帮助我们获取和管理线程资源。
有了线程池,我们无需直接创建线程,只需将需要执行的任务交给线程池就好了,线程池会帮我们分配线程来执行任务。
使用线程池,有如下好处:
- 线程池帮我们管理线程,使得我们无需关心这些细节,可以更专注于任务的实现,解耦;
- 线程池通过统一管理创建的线程,实现线程的复用,避免线程的频繁创建和销毁,减少了在创建和销毁线程上所花的时间以及系统资源的开销,资源利用率更高;
- 当需要执行大量的异步任务时,由线程池统一管理和调配线程资源,可以获得更好的性能;
本文我们会从如下几个方面来进行总结:
1. Executor框架
既然线程池这么好,我们就来看看JDK中提供了哪些线程池供我们使用吧。Java中提供线程池工具的是Executor框架,如下是其类图,我们看一下其基本组成:
1.1 Eecutor
处于最顶部的是Executor,这是一个基础接口,只定义了一个唯一方法execute(),用于提交任务:
void execute(Runnable command);
1.2 ExecutorService
ExecutorService则提供了更多功能,包括service的管理功能如shutdown等方法,还包括不同于execute的更全面的提交任务机制,如返回Future的submit方法。因为Runnable是执行工作的独立任务,但是它不返回任何值,如果希望任务在完成时能够返回一个值,那么可以让任务实现Callable接口而不是Runnable接口,并且必须使用ExecutorService.submit()方法提交任务,看一个demo吧:
// 定义一个带返回值的任务,实现Callable接口
class TaskWithResult implements Callable<String>{ private int id; public TaskWithResult(int id){ this.id = id; }
// 这个就是提供返回值的方法,当获取返回值时实际会调用这个方法 public String call(){ return "result of TaskWithResult " + id; } } public class CallableDemo{ public static void main(String[] args){ ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Futrue<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i<10 ; i++){
// 提交任务之后会返回一个Future,可以通过它的get方法获取任务计算返回的结果 results.add(exec.submit(new TaskWithResult(i))); } for(Future<String> fs : results){ try{ // 调用get()方法时必要的话(计算任务未完成)会阻塞 System.out.println(fs.get()); }catch(InterruptedException e){ System.out.println(e); return; }catch(ExecutionExecution e){ System.out.println(e); return; }finally{ exec.shutdown(); } } } } /** output: result of TaskWithResult 0 result of TaskWithResult 1 ... result of TaskWithResult 9 */
1.3 线程池实现
JDK提供了几种线程池基础实现,分别是ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。通过不同的构造参数,我们可以产生多种不同特性的线程池以满足复杂多变的实际应用场景。后面我们会进一步分析其构造函数部分源码,来剖析这个灵活性的源头。
1.4 Executors
借助Executors提供的静态工厂方法,我们可以方便地创建出不同配置的线程池,Executors目前主要提供了如下几种不同的线程池创建方式:
-
newCachedThreadPool(),它是一种用来处理大量短时间工作任务的线程池,它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过60秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。其内部使用 SynchronousQueue作为工作队列。
-
newFixedThreadPool(int nThreads),重用指定数目(nThreads)的线程,其底层使用的是无界的工作队列,任何时候最多有nThreads个工作线程是活动的。这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。
-
newSingleThreadExecutor(),它的特点在于工作线程数目被限制为1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。
-
newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize),创建的是一个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。
-
newWorkStealingPool(int parallelism),这是一个经常被人忽略的线程池,java8才加入这个创建方法,其内部会构建ForkJoin Pool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。
2. 线程池使用
利用这些工厂方法,常见的线程池创建方式如下:
ExecutorService threadPool1 = Executors.newCachedThreadPool(); ExecutorService threadPool2 = Executors.newFixedThreadPool(10); ExecutorService threadPool3 = Executors.newSingleThreadExecutor(); ExecutorService threadPool4 = Executors.newScheduledThreadPool(10); ExecutorService threadPool5 = Executors.newWorkStealingPool();
在大多数应用场景下,使用Executors提供的静态工厂方法就足够了,但是仍然可能需要直接利用ThreadPoolExecutor等构造函数线程池创建(其实如上5种方式除了newWorkStealingPool之外,其余都是通过ThreadPoolExecutor类的构造函数来实现的),比如:
ExecutorService service = new ThreadPoolExecutor(1,1, 60L,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
为什么需要这样做呢?因为这样做可以根据我们的实际使用场景灵活调整线程池参数。这需要对线程池构造方式有进一步的了解,需要明白线程池的设计和结构。因为大部分线程池的构造函数都是调用的ThreadPoolExecutor的构造器,所以在本文以及后面的原理分析的文章中我们都是针对ThreadPoolExecutor,JDK为1.8,我们先来看一下ThreadPoolExecutor的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
当然ThreadPoolExecutor还有很多构造函数,但是底层也都是调用的这个构造函数,只是传的参数是默认参数而已,这里就不一一列出了,占空间。线程池的构造函数有一堆的参数,这个还是有必要看一下的:
-
corePoolSize:核心线程数量,常驻线程数量,包括空闲线程;
-
maximumPoolSize:最大的线程数量,常驻+临时线程数量;
-
workQueue:多余任务等待队列,此队列仅保持由 execute方法提交的 Runnable任务,必须是BlockingQueue;
-
keepAliveTime:非核心线程空闲时间,即当线程数大于核心数时,多余的空闲线程等待新任务的最长时间;
-
unit:keepAliveTime 参数的时间单位;
-
threadFactory:执行程序创建新线程时使用的工厂,这里用到了抽象工厂模式,Executors提供了一个默认的线程工厂实现DefaultThreadFactory;
-
handler:线程池拒绝策略,当任务实在是太多,没有空闲线程,等待队列也满了,如果还有任务怎么办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了,你提交了也处理不了;
通过配置不同的参数,我们就可以创建出行为特性各异的线程池,而这,就是线程池高度灵活性的基石。
3. 线程池结构及状态
到这里我们知道线程的优点,学习了怎样创建线程池以及通过构造器部分的源码我们知道了线程池灵活性的根源,是时候再进一步了。我们可以把线程池理解成为一个容器,帮我们创建线程,接受我们提交给它的任务,并帮我们执行任务。那我们就有必要详细来看一下线程池内部是如何保存我们的任务以及线程,并通过什么方式来表征线程池自身的状态的。
我们进入源码,首先映入眼帘的便是如下这一堆代码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 工作线程的理论上限,大约5亿多个线程 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //11100000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //00100000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //01000000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //01100000000000000000000000000000 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl,即线程池的控制状态,这是一个原子类,在这个整型数中封装了两层意思(限于表达能力,只能这样表达):
- workerCount,即有效线程数量(也可以说是worker的数量);
- runState,你线程池的运行状态;
我们来看一下Doug Lea大神是如何在一个整型变量中表达两层含义的呢?
3.1 线程数量
我们知道Java中的int型整数是32位的,在线程池中利用整型的高3位来表征线程池的运行状态,用剩下的低29位来表达有效线程数量,2的29次方是什么数量级,大概5亿吧,在目前以及未来很长一段时间,单机上是很难达到这个级别的线程数量的(即便未来存在问题,也可以通过Long类型来解决),所以线程数量问题就满足了,多出来的高三位就可以用来表达线程池运行状态了。
3.2 线程池状态
对照代码来看,上面COUNT_BITS实际为29,CAPACITY表示最大有效线程数量,大概是2的29次方。线程的状态和其对应的位的值如下:
- RUNNING:高三位为111,运行状态,可以接受任务执行队列里的任务;
- SHUTDOWN:高三位为000,指调用了 shutdown() 方法,不再接受新任务了,但是队列里的任务得执行完毕;
- STOP:高三位为001,指调用了 shutdownNow() 方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务;
- TIDYING:高三位为010,所有任务都执行完毕,在调用 shutdown()/shutdownNow() 中都会尝试更新为这个状态;
- TERMINATED:高三位为011,终止状态,当执行 terminated() 后会更新为这个状态;
这些状态之间是会互相转变的,它们之间的转换时机如下:
- RUNNING -> SHUTDOWN,调用线程池的shutdown()方法;
- (RUNNING or SHUTDOWN) -> STOP,调用线程池的shutdownNow()方法时;
- SHUTDOWN -> TIDYING,当任务队列和线程池(保存线程的一个hashSet)都为空时;
- STOP -> TIDYING,当任务队列为空时;
- TIDYING -> TERMINATED,调用线程池的terminated()方法并执行完毕之后;
说了这么多,还是上张图吧:
3.3 为什么这么设计
但是看上面那堆代码,因为一个整型变量表示两种含义,每次要使用的时候都要通过一些位运算来将需要的信息提取出来,为什么不直接用两个变量来表示?难道是节约空间?嗯,起先我也是这样认为的,后来才发现是自己too young了。。。一个整型总共才占用4个字节,两个才多了4个字节,为了这4个字节需要这么大费周章吗!后来才知道这是因为在多线程环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改动的情况,如果将他们放在同一个AtmocInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的,嗯,对Doug大神并发的理解真是出神入化。后面我们在源码分析中可以有更直观的体会。
3.4 线程池核心数据结构
我们接着看源码,主要有两个地方需要注意:
// 保存任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 保存工作线程的set,即真正的池 private final HashSet<Worker> workers = new HashSet<Worker>();
对于这里,比较简单:
- 工作队列负责存储用户提交的任务,容量可以指定,必须为BlockingQueue
- 这个works才是真正的“线程池”,用来保存工作线程的集合,原来所谓的线程池中的线程都是保存在一个HashSet中。线程池的工作线程被抽象为静态内部类Worker,是基于AQS实现,后面会详细分析其原理。
4. 总结
1. 使用线程池有很多好处:
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
-
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;
-
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控;
- 解耦,用户不用关心线程的创建,只需提交任务即可;
2. JDK中Executor框架提供如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool等线程池的基本实现,可以通过Executors提供的静态工厂方法创建多种线程池,也可使用ThreadPoolExecutor提供的构造函数定制化符合业务需求的线程池;
3. 线程通过一个整型变量ctl表示存活线程数量和线程池运行状态;
4. 用户提交的任务是保存在一个阻塞队列中,线程池创建的工作线程是保存在一个HashSet中;
在本文中我们从线程池优点开始,再到了解整个Executor框架,通过一些简单demo了解了线程池的基本使用,再结合源码初步分析了线程池的内部数据结构以及状态表征,关于线程池进一步的运行原理,有兴趣的同学可以关注后面的文章。总结不易,觉得有帮助就点个赞吧^_^