线程池异常处理之重启线程处理任务
本文记录一下在使用线程池过程中,如何处理 while(true)
循环长期运行的任务,在业务处理逻辑中,如果抛出了运行时异常时怎样重新提交任务。
这种情形在Kafka消费者中遇到,当为每个Consumer开启一个线程时, 在线程的run方法中会有while(true)
循环中消费Topic数据。
本文会借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder
类创建线程工厂,因为它能不仅很方便地为线程池设置一个易读的名称,而且很方便地设置线程执行过程中出现异常时 用来处理异常的 异常处理器,示例如下:
MyExceptionHandler exceptionHandler = new MyExceptionHandler();
//设置线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
//设置异常处理器
.setUncaughtExceptionHandler(exceptionHandler).build();
当线程执行过程中出现了异常,MyExceptionHandler#uncaughtException(...)
方法就会由JVM调用。在java.lang.ThreadGroup#uncaughtException
方法注释提到:由于每个线程都隶属于某个线程组,如果该线程所属的线程组有父线程组,则调用父线程组中指定的异常处理器;若没有父线程组,则判断 有没有 为线程自定义 异常处理器,而在本文中,定义了自己的异常处理器:MyExceptionHandler
,因此线程执行异常时就会调用MyExceptionHandler#uncaughtException(...)
创建好了线程工厂,接下来就是创建线程池了。
CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
CustomThreadPoolExecutor 继承ThreadPoolExecutor
扩展线程池的功能:若线程执行某任务失败时 需要重新提交该任务,可以重写CustomThreadPoolExecutor#afterExecute
方法,在该方法中实现提交任务。
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//若线程执行某任务失败了,重新提交该任务
if (t != null) {
Runnable task = r;
System.out.println("restart task...");
execute(task);
}
}
}
如果在new ThreadPoolExecutor时未传入 ThreadFactory参数,如下:
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);
其实是调用Executors.defaultThreadFactory()
创建默认的ThreadFactory:
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
它为每个创建的线程设置了名字:"pool-xxx-thread-xxx"。而采用默认的ThreadFactory时相应的默认的异常处理器执行逻辑是由java.lang.ThreadGroup#uncaughtException
方法来处理的,其中处理异常的相关源码如下:
else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread ""
+ t.getName() + "" ");
e.printStackTrace(System.err);
}
如果线程执行过程中抛出的错误 不是 ThreadDeath对象,那么只是简单地:打印线程名称,并将堆栈信息记录到控制台中,任务结束。如果是一个ThreadDeath对象,看ThreadDeath类的源码注释可知:异常处理器不会被调用,程序不会输出任何日志信息。(有木有碰到这种情况,线程池中的线程不知不觉地消失了……)
The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.
在本文的示例程序CustomThreadPoolExecutorTest.java中,为了模拟在while(true)
循环中抛出异常,定义一个 Boolean 变量 stop 使得线程执行一段时间抛出一个异常:也即先让test线程运行一段时间,然后主线程设置 stop 变量的值,使得test线程抛出运行时异常。(完整代码可参考文末)
if (stop) {
throw new RuntimeException("running encounter exception");
}
线程池提交 while(true)
循环任务:
threadPoolExecutor.execute(()->{
//提交的是一个while(true)任务,正常运行时这类任务不会结束
while (true) {
System.out.println("start processing");
try {
//模拟任务每次执行耗时1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new RuntimeException("running encounter exception");
}
}
});
threadPoolExecutor.execute
提交了一个任务,这会耗费一个线程来执行该任务,由于任务是个while(true)
循环,正常情况下该任务不会终止。换句话说,这个任务会"永久"占用线程池中的一个线程。因此,对于while(true)
循环的任务需要注意:
创建线程池new ThreadPoolExecutor(...)
时,指定的 corePoolSize 不能小于 需要提交的任务个数,否则有些任务不能立即启动,线程池需要增加线程(最大增加到maximumPoolSize 个线程)来处理。如果 maximumPoolSize 小于需要提交的任务个数,由于每个任务永久地占用一个线程执行,那么有些任务就只能一直堆积在taskQueue 任务队列中了
而在本示例中,main 线程通过设置 stop 变量让 test 线程抛出异常,自定义的异常处理器MyExceptionHandler就会处理该异常,并且在该任务执行“完成”后,JVM会调用线程池的afterExecute(...)方法,又重新提交该任务。
总结
这篇文章总结了本人在使用JAVA线程池中的一些理解,写代码以线程池方式提交任务,程序跑一段时间,没有数据输出了,好像暂停了,看堆栈信息线程莫名其妙地消失了,或者阻塞在任务队列上拿不到Task了……因此需要明白线程池底层执行的机制。
- 在实现Kafka消费者过程中,每个消费者一个线程,使用线程池来管理线程、提交任务。但总过一段时间后Kafka Broker Rebalance,看后台日志是Kafka Consumer在解析一些消息时抛出了运行时异常。这样线程池就结束了这个任务,由于没有重写
afterExecute()
方法 当任务出现异常时重新提交任务。因此,这意味着永久丢失了一个消费者线程。而少了一个消费者,Kafka就发生了Rebalance。 - 尽量使用线程池来管理线程,而不是自己 new Thread(),一方面是采用线程池可方便地为每个线程设置合理的名称,这样便于debug。另一方面,通过
implements Thread.UncaughtExceptionHandler
自定义线程运行时异常处理器,可方便地打印出线程异常日志。 - 可继承
ThreadPoolExecutor
扩展线程池功能,比如在任务执行完成后,执行一些额外的操作。关于如何扩展线程池,ElasticSearch源码中线程池模块很值得借鉴。 - 上文中提到的异常处理器 和 向线程池提交任务的拒绝策略
RejectedExecutionHandler
是两回事。另外,为了图方便,直接在main方法中创建线程池了,实际应用中肯定不能这样。这里给出的代码只是Examples。
最后给出一个思考问题:针对需要长期运行的任务,比如每隔一段时间从Redis中读取若干条数据。是提交一个Runnable任务,这个Runnable任务里是个while(true)
循环读取数据:
executor.execute(()->{
while (true) {
//读若干条数据
read();
sleep(1000);
}
});
还是:在一个外部while循环中,不断地向 taskQueue 任务队列中提交任务呢?
while (true) {
executor.execute(()->{
read();
});
sleep(1000);
}
CustomThreadPoolExecutorTest.java 完整代码:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExecutorTest {
private static volatile boolean stop = false;
public static void main(String[] args)throws InterruptedException {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
//定义 线程执行过程中出现异常时的 异常处理器
MyExceptionHandler exceptionHandler = new MyExceptionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
.setUncaughtExceptionHandler(exceptionHandler).build();
CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
threadPoolExecutor.execute(()->{
//提交的是一个while(true)任务,正常运行时这类任务不会结束
while (true) {
System.out.println("start processing");
try {
//模拟任务每次执行耗时1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new RuntimeException("running encounter exception");
}
}
});
Thread.sleep(2000);
//模拟 test- 线程 在执行任务过程中抛出异常
stop = true;
Thread.sleep(1000);
stop = false;
}
private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
}
}
}
ThreadPoolExecutorTest.java 测试线程在执行过程中抛出ThreadDeath对象:
import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
private static volatile boolean stop = false;
public static void main(String[] args) throws InterruptedException{
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
executor.execute(()->{
while (true) {
System.out.println("start processing");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new ThreadDeath();
// throw new RuntimeException("runtime exception");
}
}
});
Thread.sleep(3000);
stop = true;
Thread.sleep(2000);
executor.execute(()->{
//能够继续提交任务执行
System.out.println("continue submit runnable task,is All thread in thread pool dead?");
});
}
}