zoukankan      html  css  js  c++  java
  • 线程池异常处理之重启线程处理任务

    线程池异常处理之重启线程处理任务

    本文记录一下在使用线程池过程中,如何处理 while(true)循环长期运行的任务,在业务处理逻辑中,如果抛出了运行时异常时怎样重新提交任务。

    这种情形在Kafka消费者中遇到,当为每个Consumer开启一个线程时, 在线程的run方法中会有while(true)循环中消费Topic数据。

    本文会借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder类创建线程工厂,因为它能不仅很方便地为线程池设置一个易读的名称,而且很方便地设置线程执行过程中出现异常时 用来处理异常的 异常处理器,示例如下:

     MyExceptionHandler exceptionHandler = new MyExceptionHandler();
    //设置线程名称
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
    //设置异常处理器
                    .setUncaughtExceptionHandler(exceptionHandler).build();
    

    当线程执行过程中出现了异常,MyExceptionHandler#uncaughtException(...)方法就会由JVM调用。在java.lang.ThreadGroup#uncaughtException方法注释提到:由于每个线程都隶属于某个线程组,如果该线程所属的线程组有父线程组,则调用父线程组中指定的异常处理器;若没有父线程组,则判断 有没有 为线程自定义 异常处理器,而在本文中,定义了自己的异常处理器:MyExceptionHandler,因此线程执行异常时就会调用MyExceptionHandler#uncaughtException(...)

    创建好了线程工厂,接下来就是创建线程池了。

    CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
    

    CustomThreadPoolExecutor 继承ThreadPoolExecutor扩展线程池的功能:若线程执行某任务失败时 需要重新提交该任务,可以重写CustomThreadPoolExecutor#afterExecute方法,在该方法中实现提交任务。

    public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    
        public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            //若线程执行某任务失败了,重新提交该任务
            if (t != null) {
                Runnable task =  r;
                System.out.println("restart task...");
                execute(task);
            }
        }
    }
    

    如果在new ThreadPoolExecutor时未传入 ThreadFactory参数,如下:

    BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);
    

    其实是调用Executors.defaultThreadFactory()创建默认的ThreadFactory:

            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    

    它为每个创建的线程设置了名字:"pool-xxx-thread-xxx"。而采用默认的ThreadFactory时相应的默认的异常处理器执行逻辑是由java.lang.ThreadGroup#uncaughtException方法来处理的,其中处理异常的相关源码如下:

    else if (!(e instanceof ThreadDeath)) {
                    System.err.print("Exception in thread ""
                                     + t.getName() + "" ");
                    e.printStackTrace(System.err);
                }
    

    如果线程执行过程中抛出的错误 不是 ThreadDeath对象,那么只是简单地:打印线程名称,并将堆栈信息记录到控制台中,任务结束。如果是一个ThreadDeath对象,看ThreadDeath类的源码注释可知:异常处理器不会被调用,程序不会输出任何日志信息。(有木有碰到这种情况,线程池中的线程不知不觉地消失了……)

    The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.

    在本文的示例程序CustomThreadPoolExecutorTest.java中,为了模拟在while(true)循环中抛出异常,定义一个 Boolean 变量 stop 使得线程执行一段时间抛出一个异常:也即先让test线程运行一段时间,然后主线程设置 stop 变量的值,使得test线程抛出运行时异常。(完整代码可参考文末)

    if (stop) {
        throw new RuntimeException("running encounter exception");
     }
    

    线程池提交 while(true)循环任务:

            threadPoolExecutor.execute(()->{
                //提交的是一个while(true)任务,正常运行时这类任务不会结束
                while (true) {
                    System.out.println("start processing");
                    try {
                        //模拟任务每次执行耗时1000ms
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        //ignore
                    }
                    System.out.println("finish processing");
    
                    if (stop) {
                        throw new RuntimeException("running encounter exception");
                    }
                }
            });
    

    threadPoolExecutor.execute提交了一个任务,这会耗费一个线程来执行该任务,由于任务是个while(true)循环,正常情况下该任务不会终止。换句话说,这个任务会"永久"占用线程池中的一个线程。因此,对于while(true)循环的任务需要注意:

    创建线程池new ThreadPoolExecutor(...)时,指定的 corePoolSize 不能小于 需要提交的任务个数,否则有些任务不能立即启动,线程池需要增加线程(最大增加到maximumPoolSize 个线程)来处理。如果 maximumPoolSize 小于需要提交的任务个数,由于每个任务永久地占用一个线程执行,那么有些任务就只能一直堆积在taskQueue 任务队列中了

    而在本示例中,main 线程通过设置 stop 变量让 test 线程抛出异常,自定义的异常处理器MyExceptionHandler就会处理该异常,并且在该任务执行“完成”后,JVM会调用线程池的afterExecute(...)方法,又重新提交该任务。

    总结

    这篇文章总结了本人在使用JAVA线程池中的一些理解,写代码以线程池方式提交任务,程序跑一段时间,没有数据输出了,好像暂停了,看堆栈信息线程莫名其妙地消失了,或者阻塞在任务队列上拿不到Task了……因此需要明白线程池底层执行的机制。

    1. 在实现Kafka消费者过程中,每个消费者一个线程,使用线程池来管理线程、提交任务。但总过一段时间后Kafka Broker Rebalance,看后台日志是Kafka Consumer在解析一些消息时抛出了运行时异常。这样线程池就结束了这个任务,由于没有重写afterExecute()方法 当任务出现异常时重新提交任务。因此,这意味着永久丢失了一个消费者线程。而少了一个消费者,Kafka就发生了Rebalance。
    2. 尽量使用线程池来管理线程,而不是自己 new Thread(),一方面是采用线程池可方便地为每个线程设置合理的名称,这样便于debug。另一方面,通过 implements Thread.UncaughtExceptionHandler自定义线程运行时异常处理器,可方便地打印出线程异常日志。
    3. 可继承ThreadPoolExecutor扩展线程池功能,比如在任务执行完成后,执行一些额外的操作。关于如何扩展线程池,ElasticSearch源码中线程池模块很值得借鉴。
    4. 上文中提到的异常处理器 和 向线程池提交任务的拒绝策略RejectedExecutionHandler是两回事。另外,为了图方便,直接在main方法中创建线程池了,实际应用中肯定不能这样。这里给出的代码只是Examples。

    最后给出一个思考问题:针对需要长期运行的任务,比如每隔一段时间从Redis中读取若干条数据。是提交一个Runnable任务,这个Runnable任务里是个while(true)循环读取数据:

            executor.execute(()->{
                while (true) {
                    //读若干条数据
                    read();
                    sleep(1000);
                }
            });
    

    还是:在一个外部while循环中,不断地向 taskQueue 任务队列中提交任务呢?

            while (true) {
                executor.execute(()->{
                    read();
                });
                sleep(1000);
            }
    

    CustomThreadPoolExecutorTest.java 完整代码:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    
    public class CustomThreadPoolExecutorTest {
        private static volatile boolean stop = false;
        public static void main(String[] args)throws InterruptedException {
            BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
            //定义 线程执行过程中出现异常时的 异常处理器
            MyExceptionHandler exceptionHandler = new MyExceptionHandler();
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                    .setUncaughtExceptionHandler(exceptionHandler).build();
            CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
    
            threadPoolExecutor.execute(()->{
                //提交的是一个while(true)任务,正常运行时这类任务不会结束
                while (true) {
                    System.out.println("start processing");
                    try {
                        //模拟任务每次执行耗时1000ms
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        //ignore
                    }
                    System.out.println("finish processing");
    
                    if (stop) {
                        throw new RuntimeException("running encounter exception");
                    }
                }
            });
    
            Thread.sleep(2000);
            //模拟 test- 线程 在执行任务过程中抛出异常
            stop = true;
            Thread.sleep(1000);
            stop = false;
        }
    
        private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
            }
        }
    }
    

    ThreadPoolExecutorTest.java 测试线程在执行过程中抛出ThreadDeath对象:

    import java.util.concurrent.*;
    public class ThreadPoolExecutorTest {
        private static volatile boolean stop = false;
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
            executor.execute(()->{
                while (true) {
                    System.out.println("start processing");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        //ignore
                    }
                    System.out.println("finish processing");
                    if (stop) {
                        throw new ThreadDeath();
    //                    throw new RuntimeException("runtime exception");
                    }
                }
            });
            Thread.sleep(3000);
            stop = true;
            Thread.sleep(2000);
    
            executor.execute(()->{
                //能够继续提交任务执行
                System.out.println("continue submit runnable task,is All thread in thread pool dead?");
            });
        }
    }
    

    参考资料:

    原文:https://www.cnblogs.com/hapjin/p/10240863.html

  • 相关阅读:
    Android xml text 预览属性
    GridView、ListView默认的点击背景修改
    java.util.ConcurrentModificationException
    Android 菜单定制使用小结
    Panel 中加载窗体
    git代理配置
    TableLayoutPanel 动态添加 行 列
    C# 左右补零
    Dart Map<> 添加 元素
    Mac 永久添加 环境变量方法
  • 原文地址:https://www.cnblogs.com/hapjin/p/10240863.html
Copyright © 2011-2022 走看看