- 一、自定义线程池的实现
- 2 ThreadPoolExecutor常用方法(JUC线程池实现)
- 3 设计模式-工作线程
一、自定义线程池的实现
1-1 线程池的组成概述
线程池基本思想:创建一批线程,让这批线程得到频繁的利用。
- 减少线程对于资源的占用
- 减少线程上下文切换的开销。
自定义线程池的组成:
1)线程池(图中左边部分):线程池中有多个线程。
2)阻塞队列:生产者和消费者模式下,平衡任务产生与消费的组件
- 线程池中的线程是任务的消费者。
- 还有另外程序是任务的生产者。
1-2 线程池的基本实现
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/*定义阻塞队列,利用reentrantlock的条件变量对任务执行线程以及任务产生线程进行阻塞管理*/
@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T>{
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/*队列属性的定义*/
// 01 定义任务队列,Java中LinkList也是双端队列,但ArrayDeque效能会好点
private Deque<T> queue = new ArrayDeque<T>();
// 02 锁机制,线程池执行具体的任务时需要加锁,避免任务的重复执行
private ReentrantLock lock = new ReentrantLock();
// 定义条件变量,生产者/消费者都需要阻塞等待,但二者阻塞条件不同。
// 03 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 04 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 05 容量
private int capacity;
/*队列方法的定义*/
// 阻塞获取
public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while(queue.isEmpty()){
try {
if(nanos <= 0) // 所有等待时间已经耗尽,返回null
return null;
// 这里考虑了虚假唤醒的情况,awaitNanos返回时间减去了已经等待时间。
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
public void put(T element){
lock.lock();
try{
while(queue.size() == this.capacity){
try {
log.warn("阻塞队列满了",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(element);
log.warn("已经加入{}任务到阻塞队列",element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
public int getCapacity(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
}
@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
public void execute(Runnable task){
/*
当前运行线程的数目 < 线程池的大小,则分配线程运行task,否则将task加入阻塞队列
*/
synchronized (workers){
if(workers.size() < coreSize){
Worker worker = new Worker(task);
log.warn("{任务{}获得线程池直接获得线程资源,没有进入过阻塞队列}",task);
workers.add(worker);
worker.start();
}else{
taskQueue.put(task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run(){
// 01 当任务为空时,执行任务
// 02 当任务执行完毕时,再接着从任务队列中获取任务执行。
while ((task !=null) || (task = taskQueue.take()) != null){
try{
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.test8")
public class test8 {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MICROSECONDS,10);
for(int i = 0;i < 5;++i){
int j = i;
threadPool.execute(()->{
log.warn("正在执行任务 {}",j);
});
}
System.out.printf("所有任务执行完毕!");
}
}
上面代码测试中:
- 线程池大小:2
- 任务队列大小:10
- 任务数:5
执行结果:
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$1/940060004@783e6358获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$1/940060004@5315b42e获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/940060004@5d624da6任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/940060004@1e67b872任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/940060004@60addb54任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 0
[Thread-0] WARN c.test8 - 正在执行任务 2
[Thread-0] WARN c.test8 - 正在执行任务 3
[Thread-0] WARN c.test8 - 正在执行任务 4
[Thread-1] WARN c.test8 - 正在执行任务 1
所有任务执行完毕!
- 可以看到2个任务得到迅速执行,超过线程池大小的3个任务需要先进入任务队列然后才能被分配线程资源执行
并发的任务数过多,阻塞队列都放不下了的问题?
代码不足之处:当并发的任务数 > 任务队列容量+线程池大小,新的任务无法添加到阻塞队列,会造成添加任务提供线程陷入长时间的阻塞状态。
- 线程池大小:2
- 任务队列大小:3
- 任务数:10
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$1/1338668845@7cd84586获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$1/1338668845@66a29884获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@cc34f4d任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@17a7cec2任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@65b3120a任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 0
[Thread-0] WARN c.test8 - 正在执行任务 2
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@79fc0f2f任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 3
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@50040f0c任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 4
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@2dda6444任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 5
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@5e9f23b4任务到阻塞队列
[Thread-1] WARN c.test8 - 正在执行任务 1
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 6
[Thread-1] WARN c.test8 - 正在执行任务 7
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$1/1338668845@4783da3f任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 8
[Thread-1] WARN c.test8 - 正在执行任务 9
所有任务执行完毕!
-
阻塞队列满了,会导致调用put方法的线程陷入到阻塞状态。
-
考虑改造put方法,让任务提供方在等待一定时间后放弃等待。
public void put(T element){
lock.lock();
try{
while(queue.size() == this.capacity){
try {
log.warn("阻塞队列满了",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(element);
log.warn("已经加入{}任务到阻塞队列",element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
将put方法改造为offer方法
/*有超时时间的将任务添加到阻塞队列,如果超时返回false表示任务添加阻塞队列失败*/
boolean offer(T element,TimeUnit unit,long timeout){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while(queue.size() == this.capacity){
try {
if(nanos <= 0)
return false;
// log.warn("阻塞队列满了,还需等待{}",element);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(element);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
1-3 线程的拒绝策略以及策略模式
1-3-1常见的拒绝策略
1)调用者死等
2)调用者带超时等待
3)调用者放弃任务执行。
4)调用者抛出异常
5)调用者自己执行任务
1-3-2 拒绝策略通过策略模式的实现
基本思想:线程池向外提供函数式接口,调用者可以在函数式接口中实现自己想要的策略。
ackage chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/*定义一个类时需要考虑这个类有哪些属性以及相关的方法*/
@Slf4j(topic = "c.BlockQueue")
class BlockQueue<T>{
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/*队列属性的定义*/
// 01 定义任务队列,Java中LinkList也是双端队列,但ArrayDeque效能会好点
private Deque<T> queue = new ArrayDeque<T>();
// 02 锁机制,线程池执行具体的任务时需要加锁,避免任务的重复执行
private ReentrantLock lock = new ReentrantLock();
// 定义条件变量,生产者/消费者都需要阻塞等待,但二者阻塞条件不同。
// 03 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 04 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 05 容量
private int capacity;
/*队列方法的定义*/
// 阻塞获取
public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while(queue.isEmpty()){
try {
if(nanos <= 0) // 所有等待时间已经耗尽,返回null
return null;
// 这里考虑了虚假唤醒的情况,awaitNanos返回时间减去了已经等待时间。
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
public void put(T element){
lock.lock();
try{
while(queue.size() == this.capacity){
try {
log.warn("阻塞队列满了");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(element);
log.warn("已经加入{}任务到阻塞队列",element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/*有超时时间的将任务添加到阻塞队列,如果超时返回false表示任务添加阻塞队列失败*/
boolean offer(T element,TimeUnit unit,long timeout){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while(queue.size() == this.capacity){
try {
if(nanos <= 0)
return false;
// log.warn("阻塞队列满了,还需等待{}",element);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(element);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
public int getCapacity(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
public void tryput(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
/*这里改用if语句判断,将队列满的情况交给策略去处理*/
if(queue.size() == capacity){
rejectPolicy.Reject(this,task);
}else{
queue.add(task);
log.warn("已经加入{}任务到阻塞队列",task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
/*定义拒绝策略的函数式接口*/
@FunctionalInterface
interface RejectPolicy<T>{
void Reject(BlockQueue<T> queue, T task);
}
@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
// 任务队列
private BlockQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
private RejectPolicy<Runnable> rejectPolicy;
// 核心线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
public void execute(Runnable task){
/*
当前运行线程的数目 < 线程池的大小,则分配线程运行task,否则将task加入阻塞队列
*/
synchronized (workers){
if(workers.size() < coreSize){
Worker worker = new Worker(task);
log.warn("{任务{}获得线程池直接获得线程资源,没有进入过阻塞队列}",task);
workers.add(worker);
worker.start();
}else{
// taskQueue.put(task); // 这里采用的拒绝策略是死等
/*传入task,以及所采用的拒绝策略*/
taskQueue.tryput(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run(){
// 01 当任务为空时,执行任务
// 02 当任务执行完毕时,再接着从任务队列中获取任务执行。
while ((task !=null) || (task = taskQueue.take()) != null){
try{
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
workers.remove(this);
}
}
}
}
@Slf4j(topic = "c.test8")
public class test8 {
public static void main(String[] args) {
// 采用策略模式处理阻塞队列满时对线程的处理
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MICROSECONDS,2,((queue, task) -> {
// 01 死等策略
// queue.put(task);
// 02 带有超时机制的等待
// queue.offer(task,TimeUnit.MILLISECONDS,1000);
// 03 什么都不做
// log.warn("放弃执行 {}",task);
// 04 让调用者抛出异常,调用者线程抛出异常,会导致调用者后续的任务也无法得到执行
// throw new RuntimeException("任务执行失败,任务提供线程抛出异常");
// 05 让调用者自己执行任务
// task.run();
}));
for(int i = 0;i < 5;++i){
int j = i;
threadPool.execute(()->{
log.warn("正在执行任务 {}",j);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.printf("主线程执行完毕!
");
}
}
拒绝策略1:死等策略
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$2/186370029@1e80bfe8获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@6f539caf任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 0
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@79fc0f2f任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 1
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@2dda6444任务到阻塞队列
[main] WARN c.BlockQueue - 阻塞队列满了
[Thread-0] WARN c.test8 - 正在执行任务 2
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@5e9f23b4任务到阻塞队列
主线程执行完毕!
[Thread-0] WARN c.test8 - 正在执行任务 3
[Thread-0] WARN c.test8 - 正在执行任务 4
拒绝策略2:带有超时机制的等待
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$2/186370029@1e80bfe8获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@6f539caf任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@79fc0f2f任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 0
[Thread-0] WARN c.test8 - 正在执行任务 1
主线程执行完毕!
[Thread-0] WARN c.test8 - 正在执行任务 2
[Thread-0] WARN c.test8 - 正在执行任务 4
- 超过等待时间的任务会放弃执行,上面的任务3就没有得到执行。
拒绝策略3:什么都不做
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$2/186370029@1e80bfe8获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@6f539caf任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 0
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@79fc0f2f任务到阻塞队列
[main] WARN c.test8 - 放弃执行 chapter8.test8$$Lambda$2/186370029@50040f0c
[main] WARN c.test8 - 放弃执行 chapter8.test8$$Lambda$2/186370029@2dda6444
主线程执行完毕!
[Thread-0] WARN c.test8 - 正在执行任务 1
[Thread-0] WARN c.test8 - 正在执行任务 2
拒绝策略4:调用者线程主动抛出异常,停止执行
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$2/186370029@1e80bfe8获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@6f539caf任务到阻塞队列
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@79fc0f2f任务到阻塞队列
Exception in thread "main" [Thread-0] WARN c.test8 - 正在执行任务 0
java.lang.RuntimeException: 任务执行失败,任务提供线程抛出异常
at chapter8.test8.lambda$main$0(test8.java:225)
at chapter8.BlockQueue.tryput(test8.java:129)
at chapter8.ThreadPool.execute(test8.java:182)
at chapter8.test8.main(test8.java:231)
[Thread-0] WARN c.test8 - 正在执行任务 1
[Thread-0] WARN c.test8 - 正在执行任务 2
- 调用者线程抛出异常会导致该线程后续可能需要执行的任务无法执行。
拒绝策略5:调用者线程自己执行程序
[main] WARN c.ThreadPool - {任务chapter8.test8$$Lambda$2/186370029@1e80bfe8获得线程池直接获得线程资源,没有进入过阻塞队列}
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@6f539caf任务到阻塞队列
[Thread-0] WARN c.test8 - 正在执行任务 0
[main] WARN c.BlockQueue - 已经加入chapter8.test8$$Lambda$2/186370029@79fc0f2f任务到阻塞队列
[main] WARN c.test8 - 正在执行任务 3
[main] WARN c.test8 - 正在执行任务 4
[Thread-0] WARN c.test8 - 正在执行任务 1
主线程执行完毕!
[Thread-0] WARN c.test8 - 正在执行任务 2
- 可以看到这种策略模式下,调用线程(Main线程)在线程池资源不够的情况下自己完成了任务。
2 ThreadPoolExecutor常用方法(JUC线程池实现)
2-1 线程池的状态如何表示?
概述:采用int类型变量保存线程池状态,其中高3位表示线程池状态,低29位表示线程数量。
状态名 | 高三位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 一种比较温和停止的方式,线程池不会接收新的任务,但是会处理完阻塞队列中的任务。 |
STOP | 001 | N | N | 暴力停止,采用interrupt打断正在执行的任务,放弃一切任务的执行 |
TIDYING(tidy:整洁的,整齐的) | 010 | 过渡状态,所有任务都执行完毕,即将进入终结状态 | ||
TERMINATED | 011 | 终结状态 |
注意点:
1)区别shutdown与stop。
2)从状态数值上:TERMINATED > TIDYING>STOP>SHUTDOWN>RUNNING(符号位是1)
为什么将线程池状态与线程池线程数量放在一个变量中?
原因:目的是为了减少CAS操作,一个变量只需要一个原子变量,仅需要一次CAS原子操作赋值。
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
2-2 线程池对象的常用的构造方法
2-2-1 7个参数的核心构造方法(重要)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize 核心线程数目 (最多保留的线程数)
-
maximumPoolSize 最大线程数目
有2种类型的线程:核心线程与救急线程
核心线程数目+救急线程数目 = 最大线程数目 !!!!!!
-
keepAliveTime 生存时间 - 针对救急线程
-
unit 时间单位 - 针对救急线程
当任务的数目过多的时候:
核心线程资源--->阻塞队列--->救急线程资源----->拒绝策略
救急线程:用于应对突然增加的任务数(波峰),当任务数降低回归平稳,救急线程会被销毁。为了避免救急线程占用过多的资源,所以需要设置其生存时间。
-
workQueue 阻塞队列(task排队等待的地方)
-
threadFactory 线程工厂 (为线程创建提供名字)
-
handler 拒绝策略
核心线程与救急线程的区别,为什么需要救急线程?
区别:
- 生命长度:核心线程与线程池是共同存在的,救急线程在任务执行完后会生存时间限制。时间一到,就会被销毁。
- 使用时机:线程池优先使用核心线程,救急线程只有在阻塞队列满了之后才会调用。
救急线程主要用于应对较短时间内突然增多的任务数而设置的一种机制。
关于救急线程具体应用详细描述:
1)线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
2)当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排
队,直到有空闲的线程。
3)如果队列选择了有界队列(有容量限制的队列),那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
4)如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
JDK中的拒绝策略(重要)
JDK中提供了四种拒绝策略:
名称 | 说明 |
---|---|
AbortPolicy | 调用者抛出 RejectedExecutionException 异常,这是默认策略 |
CallerRunsPolicy | 让调用者运行任务 |
DiscardPolicy | 放弃本次任务 |
DiscardOldestPolicy | 放弃队列中最早的任务,本任务取而代之 |
其他框架一般会实现自己的拒绝策略,典型的如下:
框架名称 | 实现策略 | 补充说明 |
---|---|---|
Dubbo | 抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息 | 通过信息记录定位问题。 |
Netty | 创建一个新线程来执行任务 | |
ActiveMQ | 带超时等待(60s)尝试放入队列 | 阻塞队列满了,等待60s,60s之后阻塞队列依旧满了,则放弃该任务 |
PinPoint | 使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略 |
2-2-2 构造函数newFixedThreadPool(只有固定数量核心线程的线程池,无阻塞上限)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
应用场景:适用于任务量已知,相对耗时的任务
实例
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "c.test1")
public class test1 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(()->{
log.warn("1");
});
pool.execute(()->{
log.warn("2");
});
pool.execute(()->{
log.warn("3");
});
}
}
运行结果:
- 核心线程1,2会被用于执行三个任务
- 核心线程执行完任务后不会自己主动停止。
[pool-1-thread-2] WARN c.test1 - 2
[pool-1-thread-1] WARN c.test1 - 1
[pool-1-thread-2] WARN c.test1 - 3
线程的名称是通过线程工厂创建的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
-----------------------------------------------------------------------------------------
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
- 可以看到的线程名称的格式
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
如何通过线程工厂自定义线程名称?
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j(topic = "c.test1")
public class test1 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2,new ThreadFactory(){
private AtomicInteger t = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"mypool_t"+t.getAndIncrement());
}
});
pool.execute(()->{
log.warn("1");
});
pool.execute(()->{
log.warn("2");
});
pool.execute(()->{
log.warn("3");
});
}
}
执行结果:
[mypool_t1] WARN c.test1 - 1
[mypool_t1] WARN c.test1 - 3
[mypool_t2] WARN c.test1 - 2
2-2-3 构造函数之newCachedThreadPool(只有救急线程的线程池)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s。全部都是救急线程,
- 队列采用 SynchronousQueue 实现,容量为0,没有线程来取是放不进去的
- 队列采用了 SynchronousQueue (同步队列)实现特点是,它没有容量,没有线程来取是放不进去的
[ˈsɪŋkrənəs] 同步的
应用场景
- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线
程。 适合任务数比较密集,但每个任务执行时间较短的情况。
同步队列的测试
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.SynchronousQueue;
@Slf4j(topic = "c.test2")
public class test2 {
public static void main(String[] args) {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.warn("putting {} ", 1);
integers.put(1);
log.warn("{} putted...", 1);
log.warn("putting...{} ", 2);
integers.put(2);
log.warn("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
log.warn("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
log.warn("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
}
}
- 可以看到同步队列只有有线程来取任务,才能将任务放入到队列中。(同步任务发放者与任务获取者)
2-2-4 构造函数之newSingleThreadExecutor(单线程执行器)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
应用场景
- 希望多个任务排队执行(串行执行)。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
2-4-1 单线程线程池与自己创建单线程执行任务的区别?
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,
- 线程池还会新建一个线程,保证池的正常工作
2-4-2 newFixedThreadPool(1) 与 newSingleThreadExecutor()的区别
- newFixedThreadPool(1)直接返回线程池对象,可以对返回的对象进行强制类型转化后,调用 setCorePoolSize 等方法对线程池的核心线程数进行修改 。
- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因
此不能调用 ThreadPoolExecutor 中特有的方法 ,直接目的是为了避免其他程序利用返回的对象修改线程池的参数,保证这个线程池是单线程的。
2-2-5 newScheduledThreadPool(带有调度功能的线程池)
对任务的执行提供了以下功能:
- 延时执行任务的功能
- 循环执行任务的功能
实例:
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "c.test10")
public class test12 {
public static void main(String[] args) {
/*
ScheduledExecutorService提供2类功能:1)延时执行任务 2)定时执行任务。
*/
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// 克服了Timer的缺点:1)之前的任务的睡眠会影响后续任务的执行。2)之前任务的异常会导致后续任务无法执行。
pool.schedule(()->{
log.warn("task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
int i = 1/0;
}catch(Exception e){
e.printStackTrace();
}
},1, TimeUnit.SECONDS);
pool.schedule(()->{
log.warn("task2");
},1, TimeUnit.SECONDS);
}
}
任务循环执行的API
1)scheduleAtFixedRate
2)scheduleWithFixedDelay
区别:2)确保了上次任务执行结束到下一次任务开始的时间间隔是固定的。1)会收到任务执行时间长短影响。
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "c.test11")
public class test11 {
public static void main(String[] args) {
/*
ScheduledExecutorService提供2类功能:1)延时执行任务 2)定时执行任务。
*/
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
/*延时1s,之后每隔1s执行一次,由于前一个任务上一次执行完毕。
才能执行下一个任务,当每个任务执行时间较长,会影响到任务的时间间隔*/
pool.scheduleAtFixedRate(()->{
log.warn("running...");
},1,1,TimeUnit.SECONDS);
/*这里的间隔是严格的上个任务执行到下个任务开始的间隔,注意进行区分*/
pool.scheduleWithFixedDelay(()->{
log.warn("running...");
},1,1,TimeUnit.SECONDS);
}
}
2-2-6 任务执行过程中的异常处理方式
方法1:通过try/catch捕获异常
方式2:通过Future返回任务执行的异常
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.test12")
public class test12 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*01 任务中执行的异常,如果不做任何处理,根本无法感知到,但是这种异常会造成任务失败*/
ExecutorService pool1 = Executors.newFixedThreadPool(1);
pool1.submit(()->{
log.warn("task1");
int i = 1/0;
});
/*02 通过try/catch语句捕获异常*/
ScheduledExecutorService pool2 = Executors.newScheduledThreadPool(2);
pool2.schedule(()->{
log.warn("task2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
int i = 1/0;
}catch(Exception e){
e.printStackTrace();
}
},1, TimeUnit.SECONDS);
/*03 Future返回异常,如果任务执行过程中没有异常返回true*/
ExecutorService pool3 = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool3.submit(()->{
log.warn("task3");
int i = 1/0;
return true;
});
log.warn("result {}",f.get());
}
}
执行结果
[pool-1-thread-1] WARN c.test12 - task1
[pool-3-thread-1] WARN c.test12 - task3
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at chapter8.test12.main(test12.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at chapter8.test12.lambda$main$2(test12.java:33)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[pool-2-thread-1] WARN c.test12 - task2
java.lang.ArithmeticException: / by zero
at chapter8.test12.lambda$main$1(test12.java:23)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
2-3 线程池任务的提交相关API学习
- 提交任务本质上就是将任务put到线程池中。
/* 01 单纯的执行任务 */
void execute(Runnable command);
/*02 执行任务,用返回值 Future 获得任务执行结果
进一步讲,线程提供者需要获取到线程池中线程执行完任务后返回的结果,
*/
<T> Future<T> submit(Callable<T> task);
/*03 提交任务集合中所有任务
invoke:调用;激活
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/*04 提交任务集合中所有任务,带超时时间!!!*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/*05 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/*06 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException;
2-3-1 任务提供线程如何获取线程池的线程任务执行完的结果?
- 利用juc提供的FutureTask类(本质上是保护性暂停模式)
实例
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.test3")
public class test3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
// FutureTask是采用保护性暂停模式实现线程之间结果的获取
FutureTask<String> future = (FutureTask<String>) pool.submit(new Callable<String> (){
@Override
public String call() throws Exception {
log.warn("running");
Thread.sleep(1000);
return "ok";
}
});
// 主线程在线程池中的任务执行完成前会处于阻塞状态。
// 通过get方法得到执行结果
log.warn("{}",future.get());
}
}
执行结果:
[pool-1-thread-1] WARN c.test3 - running
[main] WARN c.test3 - ok
2-3-2 invokeAll的使用
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
@Slf4j(topic = "c.test4")
public class test4 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
()->{
log.warn("begin1");
Thread.sleep(1000);
return "1";
},
()->{
log.warn("begin2");
Thread.sleep(2000);
return "2";
},
()->{
log.warn("begin3");
Thread.sleep(3000);
return "3";
}
));
futures.forEach(f->{
try {
log.warn("{}", f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
执行结果
[pool-1-thread-2] WARN c.test4 - begin2
[pool-1-thread-1] WARN c.test4 - begin1
[pool-1-thread-1] WARN c.test4 - begin3
[main] WARN c.test4 - 1
[main] WARN c.test4 - 2
[main] WARN c.test4 - 3
- 可以看到invokeAll会获取集中的所有执行结果
2-3-3 invokeAllAny的使用
作用:找到一个最先执行完的任务并返回该任务的结果,其他的任务则去取消执行。
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
@Slf4j(topic = "c.test6")
public class test6 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(3);
String result = pool.invokeAny(Arrays.asList(
() -> {
log.warn("begin1");
Thread.sleep(1000);
log.warn("end1");
return "1";
},
() -> {
log.warn("begin2");
Thread.sleep(2000);
log.warn("end2");
return "2";
},
() -> {
log.warn("begin3");
Thread.sleep(3000);
log.warn("end3");
return "3";
}
));
log.warn("{}", result);
}
}
执行结果
[pool-1-thread-1] WARN c.test6 - begin1
[pool-1-thread-2] WARN c.test6 - begin2
[pool-1-thread-3] WARN c.test6 - begin3
[pool-1-thread-1] WARN c.test6 - end1
[main] WARN c.test6 - 1
分析:三个任务都执行了,但只返回了最先结束的任务(线程1)的结果。
2-3 线程池关闭的相关API学习
- JDK源码ThreadPoolExecutor.java
2-3-1 API概述
public void shutdown()
public List<Runnable> shutdownNow()
boolean isShutdown(); /*不在 RUNNING 状态的线程池,此方法就返回 true*/
boolean isTerminated(); /*线程池状态是否是 TERMINATED*/
/*调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,利用该方法让调用线程调用shutdown后在一定时间内阻塞目的是为了等待所有任务运行结束,如果在等待时间内所有任务都没有执行完毕,那么调用线程依旧会执行*/
/* Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
2-3-2 shutdown()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); /*修改线程池状态为SHUTDOWN*/
interruptIdleWorkers(); /*打断空闲线程*/
onShutdown(); /*hook for ScheduledThreadPoolExecutor*/
} finally {
mainLock.unlock();
}
tryTerminate(); /*尝试终结(没有运行的线程立即终结,运行线程也不会等)*/
}
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行 (调用线程调用该函数后会继续执行下面的代码)
实例
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.test7")
public class test7 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> result1 = pool.submit(()->{
log.warn("task 1 runing");
Thread.sleep(1000);
return 1;
});
Future<Integer> result2 = pool.submit(()->{
log.warn("task 2 runing");
Thread.sleep(1000);
return 2;
});
Future<Integer> result3 = pool.submit(()->{
log.warn("task 3 runing");
Thread.sleep(1000);
return 3;
});
pool.shutdown();
log.warn("shut down thread pool");
// pool.awaitTermination(3, TimeUnit.SECONDS);
// Future<Integer> result4 = pool.submit(()->{
// log.warn("task 4 runing");
// Thread.sleep(1000);
// return 4;
// });
}
}
执行结果
- 可以看到调用shutdown后,依旧会将任务执行完毕
[pool-1-thread-1] WARN c.test7 - task 1 runing
[main] WARN c.test7 - shut down thread pool
[pool-1-thread-2] WARN c.test7 - task 2 runing
[pool-1-thread-2] WARN c.test7 - task 3 runing
如果在shutdown之后提交任务
- 会抛出异常,拒绝任务的执行
[pool-1-thread-1] WARN c.test7 - task 1 runing
[main] WARN c.test7 - shut down thread pool
[pool-1-thread-2] WARN c.test7 - task 2 runing
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1b2c6ec2 rejected from java.util.concurrent.ThreadPoolExecutor@4edde6e5[Shutting down, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at chapter8.test7.main(test7.java:33)
[pool-1-thread-1] WARN c.test7 - task 3 runing
2-3-3 shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 修改线程池状态
interruptWorkers(); // 打断所有线程
tasks = drainQueue(); // 获取队列中剩余任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
线程池状态变为 STOP
- 不会接收新任务
- 用 interrupt 的方式中断正在执行的任务,会将队列中的任务。
总结:可以看到shutdownNow()属于特别“紧急”的操作,中断所有任务,而shutdown()会执行完已经提交的任务。
3 设计模式-工作线程
3-1 定义
定义:让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务(有限的资源处理无限的任务)
分类:也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
注意点:不同的任务类型应该使用不同的线程池,这样能够避免饥饿,提高效率(分工能够提高效率)。
3-2 线程池的饥饿问题(区别于之前的饥饿问题)
定义:任务的执行由于无法获得足够的线程,造成任务无法推进,陷入卡住状态。
发生的场景?
- 固定大小的线程池或者单线程线程池容易发生,带有缓冲的线程池不会有饥饿现象。
3-2-1 线程池饥饿的实例
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
@Slf4j(topic = "c.test9")
public class test9 {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
/*模拟一位客人点餐*/
executorService.execute(() -> {
log.warn("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.warn("做菜");
return cooking();
});
try {
log.warn("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
log.warn("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.warn("做菜");
return cooking();
});
try {
log.warn("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
执行结果
[pool-1-thread-2] WARN c.test9 - 处理点餐...
[pool-1-thread-1] WARN c.test9 - 处理点餐...
程序卡住不动了
实例分析:线程池的大小固定为2,二个客人点餐已经占用了线程池2个线程,2个客人的做菜还需要2个线程,
但是无法做菜,当前已经占用的2个线程被线程池回收,大小固定为2的线程池无法完成2个客人的task。
3-2-1 解决线程池饥饿问题的策略?
- 加大线程池大小,可以解决,但是当任务数更多的时候,依旧有可能发生类似的问题。
- 将不同类型任务使用不同的线程池(将点餐与做菜任务分开)
代码实现
- 将点餐与做菜划分为二种类型的任务。
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
@Slf4j(topic = "c.test9")
public class test9 {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
/*模拟一位客人点餐*/
waiterPool.execute(() -> {
log.warn("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.warn("做菜");
return cooking();
});
try {
log.warn("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.warn("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.warn("做菜");
return cooking();
});
try {
log.warn("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
执行结果
[pool-1-thread-1] WARN c.test9 - 处理点餐...
[pool-2-thread-1] WARN c.test9 - 做菜
[pool-1-thread-1] WARN c.test9 - 上菜: 地三鲜
[pool-1-thread-1] WARN c.test9 - 处理点餐...
[pool-2-thread-1] WARN c.test9 - 做菜
[pool-1-thread-1] WARN c.test9 - 上菜: 宫保鸡丁
3-3 线程池的大小该如何确定
3-3-1 线程池大小的影响
过小:导致程序不能充分地利用系统资源、容易导致饥饿 。
过大:导致更多的线程上下文切换,占用更多内存
3-3-2 线程池大小的确定
CPU 密集型运算
- 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
场景:执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,可以利用多线程提高它的利用率。
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40
参考资料
20210404