一、线程池定义和使用
jdk 1.5 之后就引入了线程池。
1.1 定义
从上面的空间切换看得出来,线程是稀缺资源,它的创建与销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态切换。为避免资源过度消耗需要设法重用线程执行多个任务。线程池就是一个线程缓存,负责对线程进行统一分配、调优与监控。(数据库连接池也是一样的道理)
什么时候使用线程池?
单个任务处理时间比较短;需要处理的任务数量很大。
线程池优势?
- 重用存在的线程,减少线程创建、消亡的开销,提高性能、提高响应速度。
- 当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性,可统一分配,调优和监控。
1.2 线程池在 jdk 已有的实现
- 在 juc 包下,有一个接口:Executor :
- Executor 又有两个子接口:ExecutorService 和 ScheduledExecutorService,常用的接口是 ExecutorService。
- 同时常用的线程池的工具类叫 Executors。
例如:
ExecutorService service = Executors.newCachedThreadPool();
Executor 框架虽然提供了如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等创建线程池的方法,但都有其局限性,不够灵活。
上面的几种方式点进去会发现,都是用 ThreadPoolExecutor 进行创建的:
- newSingleThreadExecutor 字面意思 简单线程执行器。
- newFixedThreadPool 字面意思固定的线程池,传参就是线程固定数目,适用于执行长期任务的场景。
- newCachedThreadPool 字面意思 缓存线程池,核心线程0,最大线程非常大,动态创建的特点。
- newScheduledThreadPool 字面意思 时间安排线程池,指定核心线程数。
- newSingleThreadScheduledExecutor 字面意思 单线程安排执行器,也就是基于只有一个核心线程的执行器之外,又可以扩展。其中又用 DelegatedExecutorService 委托执行器服务进行了包装。
可以看到,上面直接用 Executors 工具类默认的一些实现 new 出来的线程池都是用的 ThreadPoolExecutor 线程执行器这个类进行构造的,不过参数不同,导致了效果的侧重点不同。
因此,自己创建线程池推荐的方法就是,直接使用 ThreadPoolExecutor 进行个性化的创建:
构造方法种的参数有 7 个:
- corePoolSize:线程池维护线程的最少数量 (core : 核心)
- maximumPoolSize:线程池维护线程的最大数量,显然必须>=1
- keepAliveTime:线程池维护的多余的线程所允许的空闲时间,最长可以空闲多久,时间到了,如果超过 corePoolSize 的线程一直空闲,他们就会被销毁。
- unit:线程池维护线程所允许的空闲时间的单位
- workQueue:线程池所使用的缓冲队列,已经提交但是没有执行的任务会放进这里
- threadFactory:生成线程池种工作线程的线程工厂,一般使用默认
- handler:线程池对拒绝任务的处理策略,当队列满且工作线程已经达到maximumPoolSize。
阿里的 java 开发手册,强制要求,通过 ThreadPoolExecutor 来自定义,不能使用内置的,避免资源耗尽。这个很好理解,1 的类型就只有一个核心线程和最大现场,2 没有扩展性,3、4、5的最大线程数太大,内存会爆炸。
1.3 线程池使用方法
这里我们用固定线程池来测试,传入核心线程数为 5,最大数量自然就也是 5,
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
//模拟10个顾客办理业务
for (int i = 0; i < 10; i++){
//execute 执行方法,传入参数为实现了 Runnable 接口的类
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"号线程办理业务");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
其中,execute 方法就是将任务提交的方法,我们用 lambda 表达式给 execute 方法传入了参数,实际上相当于一个完整的实现了 Runnable 接口的类。
执行结果:
可以看到,我们循环了 10 次,执行任务,但是线程只用到了 1-5 ,其中有多次复用。
再比如,我们按照各种类型的线程池,自己定义一个线程池,核心线程数 2, 最大线程数 5,阻塞队列长度为 3:
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
//模拟10个顾客办理业务
for (int i = 0; i < 10; i++){
//execute 执行方法,传入参数为实现了 Runnable 接口的类
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"号线程办理业务");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
同样 10 个线程,执行起来:
可以看到,执行了 8 个任务后,就抛出了异常,说明执行了拒绝策略。
上面两个示例,我们的任务本身都是没有返回值的,如果创建的任务本身需要有返回值就需要实现 Callable 接口,然后搭配FutureTask 来传入任务,那么线程池就应该调用 submit 方法而不是 execute。
二、线程池底层原理
2.1 线程池执行逻辑
处理的流程核心就 execute() 方法,他接收一个实现了 Runnable 接口的任务,决定对这个任务的处理策略。
下图是一个比较形象的策略流程:
可能的情况有四种,也就是图中的1234:
- 如果线程池中的线程数量少于corePoolSize,就创建新的核心线程来执行新添加的任务
- 如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到队列workQueue中
- 如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的非核心线程来处理被添加的任务
- 如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来执行拒绝策略。会抛出异常,一般的拒绝策略是RejectedExecutionException
注意,执行的顺序,在 java 里有一个不合理的地方:
在池里安排任务的时候,我们的核心线程,队列,非核心线程里面排的任务顺序应该是 1 2 3;
但是真正实现上,如果三个都满了,开始执行的时候,依次执行的顺序却是 核心线程,非核心线程,队列。也就是执行顺序会变成 1 3 2
2.2 拒绝策略
有些时候,我们并不希望拒绝策略是直接抛出异常,那么 jdk 里面提供的默认拒绝策略有 4 种,他们体现在代码中就是 ThreadPoolExecutor 的四个静态内部类:
2.2.1 CallerRunsPolicy:调用者运行策略。
这种策略不会抛弃任务,也不抛出异常,而是将某些任务回退给调用者,从而降低新任务的流量。
实现非常简单,那就是如果说 e 这个线程池已经 shutdown 了,那么就什么也不干,也就是这个任务直接丢了;否则,r.run() ,相当于调用这个方法的线程里直接执行了这个 Runnable 任务。
此时我们可以把 1.3 里的代码修改一下,只修改策略为 CallerRunsPolicy:
可以看到,有些任务会在 main 线程里处理。
2.2.2 AbortPolicy:终止策略。
抛异常。前面已经试过了,这个是默认的拒绝策略。
2.2.3 DiscardPolicy:丢弃任务。
可以看到,源码里就是是什么也不做。如果场景中允许任务丢失,这个是最好的策略。
2.2.4 DiscardOldestPolicy:抛弃队列中等待最久的任务。
抛弃队列中等待最久的任务,然后把当前的任务加入队列中,尝试再次提交当前任务。
源码里也就是利用队列操作,进行一次出队操作,然后重新调用 execute 方法。
2.3 线程池的五种状态
和一个正常的线程的生命周期区别开,这个是线程池里线程的状态。
- Running,能接受新任务以及处理已添加的任务;
- Shutdown,不接受新任务,可以处理已经添加的任务,也就是不能再调用execute或者submit了;
- Stop,不接受新任务,不处理已经添加的任务,并且中断正在处理的任务;
- Tidying,所有的任务已经终止,CTL记录的任务数量为0,CTL负责记录线程池的运行状态与活动线程数量;
- Terminated,线程池彻底终止,则线程池转变为terminated的状态。
如图所示,从running状态转换为 shutdown,调用 shutdown()方法;如果调用shutdownNow()方法,就直接会变成stop。
terminated()是钩子函数,默认是什么也不做的,我们可以重写,然后决定结束之前要做一些别的处理逻辑。这个钩子函数,就是模板模式的方法。
三、阻塞队列
线程池里的 BlockingQueue,阻塞队列,事实上在消费者生产者问题里的管程法实现,我们的策略也是类似阻塞队列的,用它来做一个缓存池的作用。
阻塞队列:任意时刻,不管并发有多高,永远保证只有一个线程能够进行队列的入队或出队操作。也就意味着他是能够保证线程安全的。
另外,阻塞队列分为有界和无界队列,理论上来说一个是队列的size有固定,另一个是无界的。对于有界队列来说,如果队列存满,只能出队了,入队操作就只能阻塞。
在 juc 包里,阻塞队列的实现有很多:
- ArrayBlockingQueue:有界阻塞队列;
- LinkedBlockingQueue:链表结构(大小默认值为Integer.MAX_VALUE)的阻塞队列;
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列;
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列;
- SynchronousQueue:不存储元素的阻塞队列,相当于只有一个元素;
- LinkedTransferQueue:链表组成的无界阻塞队列;
- LinkedBlockingDeque:链表组成的双向阻塞队列。
对于 BlockingQueue 来说,核心操作主要有几类:插入、删除、查找。
其中的四种异常策略:
- 抛异常:如果阻塞队列满,再往队列里 add 插入元素会抛 IllegalStateException:Queue full,如果阻塞队列空,再 remove 就会抛 NoSuchElementException。
- 特殊值:offer 方法:成功 true,失败 false,poll 方法,成功就返回元素,没有就返回 null。
- 阻塞:阻塞队列满的时候,生产者线程继续 put 元素,队列就会阻塞直到可以 put 数据或者响应中断然后退出,阻塞队列空的时候,消费者线程继续 take 元素,队列就会一直阻塞直到有元素可以 take。
- 超时退出:阻塞队列满的时候,会阻塞生产者线程且超时退出,空的时候会阻塞消费者线程且超时退出。
那么使用的时候,增删的方法按对应的同一组使用比较合理。(其实这个策略的设计对应的在单线程集合里也有,那就是Deque接口的实现类 LinkedList 使用的时候,不同的增删方法策略不同)