zoukankan      html  css  js  c++  java
  • 使用CompletableFuture+ExecutorService+Logback的多线程测试

    1. 环境

    Java: jdk1.8.0_144

    2. 背景

    Java多线程执行任务时,Logback输出的主线程和各个子线程的业务日志需要区分时,可以根据线程池和执行的线程来区分,但若要把它们联系起来只能根据时间线,既麻烦又无法保证准确性。

    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.LogAndCatchExceptionRunnableTest][main][testRun][38] -> test start
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] -> This is runnable.
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] -> This is runnable.
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] -> This is runnable.
    2018-10-27 23:09:22 [INFO][com.lxp.tool.log.LogAndCatchExceptionRunnableTest][main][testRun][48] -> test finish
    
    

    org.slf4j.MDC类提供了一个极好的解决方案,它可以为各个线程设置独有的上下文,当有必要时也可以把主线程的上下文复制给子线程,此时子线程可以拥有主线程+子线程的信息,在子线程退出前恢复到主线程上下文,如此一来,日志信息可以极大地便利定位问题,org.slf4j.MDC类在线程上下文切换上的应用记录本文的目的之一。
    另一个则是过去一直被自己忽略的多线程时退出的问题,任务需要多线程执行有两种可能场景

    • 多个任务互相独立,某个任务失败并不应该影响其它的任务继续执行
    • 多个子任务组成一个完整的主任务,若某个子任务失败它应该直接退出,不需要等所有子任务完成

    3. org.slf4j.MDC类在线程上下文切换时的应用

    3.1 实现包装线程

    • AbstractLogWrapper
    public class AbstractLogWrapper<T> {
        private final T job;
        private final Map<?, ?> context;
    
        public AbstractLogWrapper(T t) {
            this.job = t;
            this.context = MDC.getCopyOfContextMap();
        }
    
        public void setLogContext() {
            if (this.context != null) {
                MDC.setContextMap(this.context);
            }
        }
    
        public void clearLogContext() {
            MDC.clear();
        }
    
        public T getJob() {
            return this.job;
        }
    }
    
    • LogRunnable
    public class LogRunnable extends AbstractLogWrapper<Runnable> implements Runnable {
        public LogRunnable(Runnable runnable) {
            super(runnable);
        }
    
        @Override
        public void run() {
            // 把主线程上下文复到子线程
            this.setLogContext();
            try {
                getJob().run();
            } finally {
                // 恢复主线程上下文
                this.clearLogContext();
            }
        }
    }
    
    • LogAndCatchExceptionRunnable
    public class LogAndCatchExceptionRunnable extends AbstractLogWrapper<Runnable> implements Runnable {
        private static final Logger LOGGER = LoggerFactory.getLogger(LogAndCatchExceptionRunnable.class);
    
        public LogAndCatchExceptionRunnable(Runnable runnable) {
            super(runnable);
        }
    
        @Override
        public void run() {
            // 把主线程上下文复到子线程
            this.setLogContext();
            try {
                getJob().run();
            } catch (Exception e) { // Catch所有异常阻止其继续传播
                LOGGER.error(e.getMessage(), e);
            } finally {
                // 恢复主线程上下文
                this.clearLogContext();
            }
        }
    }
    

    3.2 配置%X输出当前线程相关联的NDC

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <appender name="stdot" class="ch.qos.logback.core.ConsoleAppender">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{yyyy-MM-dd HH:mm:ss} [%p][%c][%t][%M][%L] %replace(Test_Method=%X{method} runn-able=%X{runn_able}){'.+=( |$)', ''} -> %m%n</pattern>
            </layout>
        </appender>
        <root level="debug">
            <appender-ref ref="stdot"/>
        </root>
    </configuration>
    

    3.3 配置线程相关信息并测试

    class RunnabeTestHelper {
        private static final Logger LOGGER = LoggerFactory.getLogger(RunnabeTestHelper.class);
        private static final String RUNNABLE = "runn_able";
    
        static Runnable getRunnable() {
            return () -> {
                MDC.put(RUNNABLE, String.valueOf(System.currentTimeMillis()));
                LOGGER.info("This is runnable.");
            };
        }
    }
    
    • 测试方法
        @Test
        public void testRun() {
            try {
                MDC.put("method", "testRun");
                LOGGER.info("test start");
                LogAndCatchExceptionRunnable logRunnable = spy(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnable()));
                Set<String> set = new HashSet<>();
                doAnswer(invocation -> set.add(invocation.getMethod().getName())).when(logRunnable).setLogContext();
                doAnswer(invocation -> set.add(invocation.getMethod().getName())).when(logRunnable).clearLogContext();
    
                List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(logRunnable, executorService)).collect(Collectors.toList());
                futures.forEach(CompletableFuture::join);
                assertEquals("[setLogContext, clearLogContext]", set.toString());
                LOGGER.info("test finish");
            } finally {
                MDC.clear();
            }
        }
    
    • 测试结果
    2018-11-01 01:08:04 [INFO][com.lxp.tool.log.LogRunnableTest][main][testRun][41]  -> test start
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685003 -> This is runnable.
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685004 -> This is runnable.
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-1][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685004 -> This is runnable.
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685003 -> This is runnable.
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.RunnabeTestHelper][pool-1-thread-2][lambda$getRunnable$0][16] Test_Method=testRun runn-able=1541005685005 -> This is runnable.
    2018-11-01 01:08:05 [INFO][com.lxp.tool.log.LogRunnableTest][main][testRun][50]  -> test finish
    

    4. 多线程执行子线程出现异常时的处理

    class RunnabeTestHelper {
        private static final Logger LOGGER = LoggerFactory.getLogger(RunnabeTestHelper.class);
    
        static Runnable getRunnable(AtomicInteger counter) {
            return () -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                }
                if (counter.incrementAndGet() == 2) {
                    throw new NullPointerException();
                }
                LOGGER.info("This is {} runnable.", counter.get());
            };
        }
    
        static Runnable getRunnableWithCatchException(AtomicInteger counter) {
            return () -> {
                try {
                    Thread.sleep(1000);
                    if (counter.incrementAndGet() == 2) {
                        throw new NullPointerException();
                    }
                    LOGGER.info("This is {} runnable.", counter.get());
                } catch (Exception e) {
                    LOGGER.error("error", e);
                }
            };
        }
    }
    

    4.1 选择一:放充执行未执行的其它子线程

    • 调用LogRunnable,允许子线程的异常继续传播
        @Test
        public void testRunnableWithoutCatchException() {
            Logger logger = Mockito.mock(Logger.class);
            AtomicInteger counter = new AtomicInteger(0);
            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogRunnable(RunnabeTestHelper.getRunnable(counter)), executorService)).collect(Collectors.toList());
            try {
                futures.forEach(CompletableFuture::join);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            // 由于子线程的异常导致主线程退出,并不是所有任务都得到执行机会
            assertEquals(2, counter.get());
            verify(logger, Mockito.times(1)).error(anyString(), any(Throwable.class));
        }
    

    4.2 选择二:执行完所有无异常的子线程

    • 调用LogRunnable,在线程内部阻止异常扩散
        @Test
        public void testRunnableWithCatchException() {
            AtomicInteger counter = new AtomicInteger(0);
            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogRunnable(RunnabeTestHelper.getRunnableWithCatchException(counter)), executorService)).collect(Collectors.toList());
            futures.forEach(CompletableFuture::join);
            // 由于子线程的异常被阻止,所有线程都得到执行机会
            assertEquals(5, counter.get());
        }
    
    • 调用LogAndCatchExceptionRunnable,在包装类阻止异常扩散
        @Test
        public void testRunnableWithoutCatchException() {
            AtomicInteger counter = new AtomicInteger(0);
            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnable(counter)), executorService)).collect(Collectors.toList());
            futures.forEach(CompletableFuture::join);
            // 由于子线程的异常被阻止,所有线程都得到执行机会
            assertEquals(5, counter.get());
        }
    
        @Test
        public void testRunnableWithCatchException() {
            AtomicInteger counter = new AtomicInteger(0);
            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(0, 4).mapToObj(index -> CompletableFuture.runAsync(new LogAndCatchExceptionRunnable(RunnabeTestHelper.getRunnableWithCatchException(counter)), executorService)).collect(Collectors.toList());
            futures.forEach(CompletableFuture::join);
            // 由于子线程的异常被阻止,所有线程都得到执行机会
            assertEquals(5, counter.get());
        }
    
  • 相关阅读:
    生成函数初步
    Lg 8月赛(构造+交互)
    wqs 二分学习笔记
    FastAPI 学习之路(十六)Form表单
    线性代数入门
    Oracle-PDB拔插
    MySQL-audit审计插件
    MySQL-用户与权限管理
    MySQL-存储引擎
    MySQL-逻辑结构
  • 原文地址:https://www.cnblogs.com/hiver/p/9863883.html
Copyright © 2011-2022 走看看