作为一名Java开发工程师,想必性能问题是不可避免的。通常,在遇到性能瓶颈时第一时间肯定会想到利用缓存来解决问题,然而缓存虽好用,但也并非万能,某些场景依然无法覆盖。比如:需要实时、多次调用第三方API时,该场景缓存则无法适用。
然多线程并发的方式则很好的解决了上述问题。
但若每次都在任务开始时创建新的线程,在任务结束后销毁线程,这样增加了资源的消耗,也降到了响应速度。针对此,线程池通过维护多个线程的方式来降低创建线程的次数,并且对线程的创建、销毁以及数量加以很好的控制,保证对内核的充分利用。
总结起来:
- 降低了资源的消耗:通过池化技术,重复利用已创建好的线程。
- 增加了响应的速度:若有空闲线程时,直接执行任务,无需等待创建新的线程。
- 增加了系统的稳定性和可管理性:对系统而言,过多的线程会导致资源调度失衡,降低了系统的稳定性。线程池进行统一的管理(调优,监控和分配等),减少碎片化信息。
I. 线程池的类关系图
【JDK1.8】
从线程池类图关系可见,Java中线程池核心实现类是ThreadPoolExecutor, ThreadPoolExecutor实现的顶层接口是Executor,从图中可以发现:
- Executor接口只有一个execute()方法,将任务的提交和运行进行解耦。
- ExecutorService接口在Executor的基础上,增加了生命周期的控制(线程池状态转换)和submit()方法生成Future结果。
- AbstractExecutorService是一个抽象类,实现了ExecutorService接口中的submit()方法,并实现了任务的执行流程。
- ThreadPoolExecutor主要实现两个功能:线程池生命周期管理,任务的执行execute()方法。
作为整个架构中最核心的东西-ThreadPoolExecutor,接下来便从源码的角度来分析一二。
II. 生命周期管理
线程池的信息变量ctl是一个原子整型变量,封装了两个概念行字段:
- workerCount: 表示线程池中有效的线程数量
- runState: 表示线程池当前的运行状态
ctl信息
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor
- COUNT_BITS=29(十进制)
- CAPACITY=0000 1111 1111 1111 1111 1111 1111 1111(二进制)
信息获取:
- int c = ctl.get();
- workerCount = workerCountOf(c); //ctl的低28位表示线程数量
- runState = runStateOf(c); //ctl的高四位表示状态
运行状态
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor
从源码中可见,线程池主要有5种状态: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED,状态之间的关系如下图:
- RUNNING: 可以接受新的任务请求,也可处理阻塞队列中的任务
- SHUTDOWN: 不接受新的任务请求,但会处理阻塞队列中的任务
- STOP: 不接受新的任务请求,阻塞队列也会直接清空,正在处理的任务也会被直接中断
- TIDYING: 所有的任务都已经终止,线程池中不存在有效线程
- TERMINATED: 线程池终止
从上述状态转换图,我们发现状态的切换主要由shutdown(), shutdownNow(), tryTerminate()以及terminated()几个方法实现。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdown()
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptIdleWorkers(): 终止空闲线程。
从代码中可以发现,执行shutdown()函数会将线程池状态置为SHUTDOWN状态,并且会终止所有的空闲线程。这里通过java.util.concurrent.ThreadPoolExecutor.Worker.tryLock()方法尝试获取当前线程的AQS独占模式锁,如果目标线程处于空闲状态,则可成功获取锁;反之,若线程正在执行task,则获取锁失败,以此来判断线程是否处于空闲状态。具体的流程图如下:
注意: shutdown()函数仅仅是终止了线程池中空闲的线程,正在执行任务的线程依旧可以正常工作,所以处于SHUTDOWN状态下的线程池依旧可以从阻塞队列中获取任务并执行,但不会接受新的task请求(详情见添加worker部分)。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdownNow()
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptWorkers(): 终止线程池中所有的线程。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.interruptIfStarted(): 中断当前的线程。
从代码中可见,执行shutdownNow()函数会将线程池置为STOP状态,终止线程池中所有的线程,由于线程池无法执行task,所以这里会将阻塞队列中所有的task取出并返回。具体的流程图如下:
注意:由于所有的线程都被终止了,所以处于STOP状态下的线程池会中断当前正在执行的任务,无法从阻塞队列中获取任务并执行,也无法接受新任务的请求。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.tryTerminate()
从代码中可见,tryTerminate()函数仅在线程池处于STOP状态或者SHUTDOWN状态下阻塞队列为空的场景下,才会进一步执行,否则执行结束。需要注意的是,处于SHUTDOWN状态下,虽然阻塞队列为空,但是依然存在线程正在执行任务的可能,所以需要进行二次检查。在将线程池状态置为TIDYING状态前,如果线程池信息发生任何变化,都需要重新检查,避免一些突发异常的发生。具体流程图如下:
注意:线程池处于TIDYING状态下会直接执行terminated()方法,默认该方法为空(当然用户可进行自定义重写),之后将线程池状态置为TERMINATED,理论上处于该状态下的线程池已经结束,所以唤醒所有等待线程池结束的线程,执行其任务。
III. 运行机制
从图中可见整个运行机制主要分为以下几个模块:
- 任务请求
- 任务分配
- 添加worker
- 运行worker
- 任务拒绝
接下来,我们看看对应的源码是如何处理的。
任务请求
ThreadPoolExecutor实现了任务请求和执行的解耦,用户无需关心是如何执行的任务,只需要提供一个可执行的任务,然后调用execute()方法即可。但在实际编码中,我们常常需要获取任务执行的结果,因此,ExecutorService接口在Executor的基础上增加submit()方法,将任务封装成RunnableFuture对象,将执行结果保存为Future对象。综上所言,任务请求有两种方法,分别如下:
- void execute(Runnable command):不需要获取任务结果。
- <T> Future<T> submit(Callable<T> task):需要获取任务结果。
execute(Runnable command)见任务分配模块。
submit(Callalbe<T> task)见获取任务结果模块。
任务分配
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.execute(Runnable command)
从代码中可见,执行execute(Runnable command)函数分配任务时主要有以下四个选择:
- 线程池线程数<核心线程数:创建一条新的线程,并在该线程上直接执行任务;
- 线程池线程数>=核心线程数 && 阻塞队列未满: 将任务推入阻塞队列中,并创建一条新的线程(若此时线程池存在有效线程则不创建),该线程获取阻塞队列头部的任务并执行;
- 线程池线程数>=核心线程数 && 阻塞队列已满 && 线程池线程数<最大线程数:创建一条新的线程,并在该线程直接执行任务;
- 线程池线程数>最大线程数 && 阻塞队列已满:任务拒绝。
详细流程图如下:
注意:线程池线程数>=核心线程数 && 阻塞队列未满场景下仅当线程池无有效线程时才会创建新的线程,因为当线程池中线程执行完任务后会再次尝试去阻塞队列获取任务并执行,直到阻塞队列为空才会处于空闲状态。所以,多数场景下此时线程池的线程数=核心线程数,无需创建新的线程。当然,也存在临界场景,比如:此时正好所有的线程都恰好执行完任务并被销毁,为了避免运行异常,则需创建一个新的线程,从阻塞队列中获取任务并执行。
添加worker
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core)
从代码可见,addWorker(Runnable firstTask, boolean core)函数拥有两个入参:
- firstTask:新创建线程执行的第一个任务,这里特指新提交的任务
- core:ture表示核心线程数,false表示最大线程数
此外,只有线程池处于(RUNNING状态)或者处于(SHUTDOWN状态+阻塞队列不为空)时,才可以创建worker对象,并且在SHUTDOWN状态下firstTask(新提交的任务)必须为空,才会将新创建的worker对象加入到workers列表中,否则添加worker任务失败,即SHUTDOWN状态下不接受新提交的任务请求。
注意:当使用java.util.concurrent.ThreadPoolExecutor.compareAndIncrementWorkerCount()方法尝试登记线程失败时,需要判断下是否已经影响了线程池的状态,如果有,则重新获取线程池状态进行相关校验,若没有,则重新获取线程池线程数量,并进行线程池线程数量的检测。
运行线程任务通过thread.start()方法,即调用了worker对象的run()方法,所以执行worker线程任务会调用worker.run()方法。
运行worker
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.run()
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.runWorker(Worker w)
从代码可见,
1)runWorker(Worker w)函数会先执行firstTask(如果不为空),等第一个任务执行完会继续尝试从阻塞队列中获取任务,然后继续执行;直到阻塞队列为空。
2)这里并没有使用前面常见的可重入锁ReentranLock,而是使用了自身继承的AQS(AbstractQueuedSynchronizer)锁。当获取任务成功后,目标线程获取AQS独占模式锁,表示该线程处于忙碌状态,直到任务执行完毕才会释放。如果上个任务执行完毕后,目标线程一直无法获取新的任务(阻塞队列为空),则不会获取AQS独占模式锁,表示目标线程处于空闲状态。
3)任务的最终执行还是调用任务的run()方法,所以请求的任务是可执行命令Runnable task.
具体的流程图如下:
注意:在实际执行任务前,需要获取线程池的状态,确保此时调用shutdownNow()方法可以及时中断任务。此外,前置任务和后置任务默认为空,用户可自定义重写。
任务拒绝
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.reject(Runnable command)
从代码可见,拒绝任务时调用了handler对象的rejectedExecution()方法。
ThreadPoolExecutor实现了四种不同的策略类,分别为:
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy:调用主线程(调用ThreadPoolExecuotr的线程)直接执行任务。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.AbortPolicy: 直接放弃任务,抛出java.util.concurrent.RejectedExecutionException异常。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardPolicy: 直接放弃任务,不做任何处理。
源码分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy: 获取阻塞队列头部的任务,并直接将其抛弃;然后,重新请求任务。
获取任务结果
在实际编码中,我们常常需要获取任务执行的结果,因此,ExecutorService接口在Executor的基础上增加submit()方法,将任务封装成RunnableFuture对象,将执行结果保存为Future对象。
源码分析(JDK1.8)java.util.concurrent.AbstractExecutorService.submit(Callalbe<T> task)
注意:AbstractExecutorrService同样实现submit(Runnable<T> task)方法,这里仅以submit(Callalbe<T> task)方法进行分析。
源码分析(JDK1.8)java.util.concurrent.AbstractExecutorService.newTaskFor(Callable<T> callable): 将任务封装成RunableFuture对象。
从上述运行worker模块可知,线程上执行任务时,会调用任务的run()方法,这里封装成java.util.concurrent.FutureTask对象,在线程执行任务时,会调用FutureTask.run()方法
源码分析(JDK1.8)java.util.concurrent.FutureTask.run()
源码分析(JDK1.8)java.util.concurrent.FutureTask.set():将结果保存到outcome属性中,以便后面获取结果。
源码分析(JDK1.8)java.util.concurrent.FutureTask.get():取出运行结果。
注意:这里调用get()方法会一直等待任务执行结束或抛出异常,然后返回结果。
源码分析(JDK1.8)java.util.concurrent.FutureTask.get(long timeout, TimeUnit unit):取出运行结果,超时抛出java.util.concurrent.TimeoutException异常。
注意:get(long timeout, TimeUnit unit)中unit表示时间单位(例如:SECONDS),timeout表示时间值。调用该函数获取运行结果时,如果等待时间超时,会直接抛出java.util.concurrent.TimeoutException异常。
具体流程图如下:
IV.实际案例
submit(Callable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static String task(String args) { System.out.println(String.format("submit - thread name: %s, args: %s", Thread.currentThread().getName(), args)); return args; } private static void submitTask() throws InterruptedException, ExecutionException, TimeoutException { List<Future> futures = new ArrayList<>(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); Future f = executor.submit(() -> task(str)); futures.add(f); } for (Future f: futures) { String result = (String) f.get(60, TimeUnit.SECONDS); System.out.println(String.format("submit - thread name: %s, result: %s", Thread.currentThread().getName(), result)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { submitTask(); } }
输出结果:
submit - thread name: pool-1-thread-3, args: l submit - thread name: pool-1-thread-1, args: h submit - thread name: pool-1-thread-4, args: l submit - thread name: pool-1-thread-2, args: e submit - thread name: pool-1-thread-3, args: o submit - thread name: main, result: h submit - thread name: main, result: e submit - thread name: main, result: l submit - thread name: main, result: l submit - thread name: main, result: o
execute(Runnable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static class Task implements Runnable { private String arg; Task(String arg) { this.arg = arg; } public void run() { System.out.println(String.format("execute - thread name: %s, args: %s", Thread.currentThread().getName(), arg)); } } private static void executeTask() throws InterruptedException {final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); executor.execute(new Task(str)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { executeTask(); } }
输出结果:
execute - thread name: pool-1-thread-3, args: l
execute - thread name: pool-1-thread-1, args: h
execute - thread name: pool-1-thread-4, args: l
execute - thread name: pool-1-thread-2, args: e
execute - thread name: pool-1-thread-3, args: o