zoukankan      html  css  js  c++  java
  • 为每个请求分配traceId的两种方式及父子线程本地变量传递

     需求背景

      有时候我们需要某个请求下的所有的traceId都是一致的,以获得统一解析的日志文件。便于排查问题。

      为每一个请求分配同一个traceId据我所知有两种方式:MDC和ThreadLocal,MDC的内部实现也是ThreadLocal,下面分别介绍这两种方式。

    一、MDC

      MDC(Mapped Diagnostic Contexts),翻译过来就是:映射的诊断上下文。意思是:在日志中(映射的)请求ID(requestId),可以作为我们定位(诊断)问题的关键字(上下文)。

      有了MDC工具,只要在接口或切面植入 put 和 remove 代码,就可以在定位问题时,根据映射的唯一 requestID 快速过滤出某次请求的所有日志。

      另外,当客户端请求到来时,可以将客户端id,ip,请求参数等信息保存在MDC中,同时在logback.xml中配置,那么会自动在每个日志条目中包含这些信息。

      slf4j的MDC机制其内部基于ThreadLocal实现,可参见ThreadLocal这篇博客:https://www.cnblogs.com/yangyongjie/p/10574591.html

      MDC类基本原理其实非常简单,其内部持有一个ThreadLocal实例,用于保存context数据,MDC提供了put/get/clear等几个核心接口,用于操作ThreadLocal中的数据;ThreadLocal中的K-V,可以在logback.xml中声明,即在layout中通过声明“%X{Key}”来打印MDC中保存的此key对应的value在日志中。

      在使用MDC时需要注意一些问题,这些问题通常也是ThreadLocal引起的,比如我们需要在线程退出之前清除(clear)MDC中的数据;在线程池中使用MDC时,那么需要在子线程退出之前清除数据;可以调用MDC.clear()方法。

      MDC部分源码:

    package org.slf4j;
    public class MDC {
      // 将上下文的值作为 MDC 的 key 放到线程上下的 map 中
      public static void put(String key, String val);
      // 通过 key 获取上下文标识
      public static String get(String key);
      // 通过 key 移除上下文标识
      public static void remove(String key);
      // 清除 MDC 中所有的 entry
      public static void clear();
    }

     1、请求没有子线程的情况下代码实现:

      1)使用Aop拦截请求

    /**
     * 为每一个的HTTP请求添加线程号
     *
     * @author yangyongjie
     * @date 2019/9/2
     * @desc
     */
    @Aspect
    @Component
    public class LogAspect {
    
        private static final String STR_THREAD_ID = "threadId";
    
        @Pointcut(value = "@annotation(org.springframework.web.bind.annotation.RequestMapping)")
        private void webPointcut() {
            // doNothing
        }
    
        /**
         * 为所有的HTTP请求添加线程号
         *
         * @param joinPoint
         * @throws Throwable
         */
        @Around(value = "webPointcut()")
        public void around(ProceedingJoinPoint joinPoint) throws Throwable {
            // 方法执行前加上线程号
            MDC.put(STR_THREAD_ID, UUID.randomUUID().toString().replaceAll("-", ""));
            // 执行拦截的方法
            joinPoint.proceed();
            // 方法执行结束移除线程号
            MDC.remove(STR_THREAD_ID);
        }
    }

      2)log4j日志配置

    log4j.appender.stdout.layout.ConversionPattern=[%-5p]%d{yyyy-MM-dd HH:mm:ss.SSS}[%t]%X{threadId}[%c:%L] - %m%n

      需要注意日志红色中字符串 threadId 需要和 日志拦截中MDC put的key是一样的。

     2、请求有子线程的情况

      slf4j的MDC机制其内部基于ThreadLocal实现,可参见Java基础下的 ThreadLocal这篇博客,https://www.cnblogs.com/yangyongjie/p/10574591.html。所以我们调用 MDC.put()方法传入

      的请求ID只在当前线程有效。所以,主线程中设置的MDC数据,在其子线程(线程池)中是无法获取的。那么主线程如何将MDC数据传递给子线程? 

      官方建议

        1)在父线程新建子线程之前调用MDC.getCopyOfContextMap()方法将MDC内容取出来传给子线程

        2)子线程在执行操作前先调用MDC.setContextMap()方法将父线程的MDC内容设置到子线程 

        

      代码实现

      1)使用Aop拦截请求,与上面相同

      2)log4j日志配置与上面相同

      3)装饰器模式装饰子线程,有两种方式:

        方式一:使用装饰器模式,对Runnable接口进行一层装饰,在创建MDCRunnable类对Runnable接口进行一层装饰。

    在创建MDCRunnable类时保存当前线程的MDC值,再执行run()方法

        装饰器MDCRunnable装饰Runnable:

    import org.slf4j.MDC;
    
    import java.util.Map;
    
    /**
     * 装饰器模式装饰Runnable,传递父线程的线程号
     *
     * @author yangyongjie
     * @date 2020/3/9
     * @desc
     */
    public class MDCRunnable implements Runnable {
    
        private Runnable runnable;
    
        /**
         * 保存当前主线程的MDC值
         */
        private final Map<String, String> mainMdcMap;
    
        public MDCRunnable(Runnable runnable) {
            this.runnable = runnable;
            this.mainMdcMap = MDC.getCopyOfContextMap();
        }
    
        @Override
        public void run() {
            // 将父线程的MDC值赋给子线程
            for (Map.Entry<String, String> entry : mainMdcMap.entrySet()) {
                MDC.put(entry.getKey(), entry.getValue());
            }
            // 执行被装饰的线程run方法
            runnable.run();
            // 执行结束移除MDC值
            for (Map.Entry<String, String> entry : mainMdcMap.entrySet()) {
                MDC.put(entry.getKey(), entry.getValue());
            }
        }
    
    }

      使用MDCRunnable代替Runnable:

            // 异步线程打印日志,用MDCRunnable装饰Runnable
            new Thread(new MDCRunnable(new Runnable() {
                @Override
                public void run() {
                    logger.debug("log in other thread");
                }
            })).start();
    
            // 异步线程池打印日志,用MDCRunnable装饰Runnable
            EXECUTOR.execute(new MDCRunnable(new Runnable() {
                @Override
                public void run() {
                    logger.debug("log in other thread pool");
                }
            }));
            EXECUTOR.shutdown();

        方式二:装饰线程池

    /**
     *  装饰ThreadPoolExecutor,将父线程的MDC内容传给子线程
     * @author yangyongjie
     * @date 2020/3/19
     * @desc
     */
    public class MDCThreadPoolExecutor extends ThreadPoolExecutor {
    
        private static final Logger LOGGER= LoggerFactory.getLogger(MDCThreadPoolExecutor.class);
    
        public MDCThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        public void execute(final Runnable runnable) {
            // 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null
            final Map<String, String> context = MDC.getCopyOfContextMap();
            super.execute(new Runnable() {
                @Override
                public void run() {
                    // 将父线程的MDC内容传给子线程
                    MDC.setContextMap(context);
                    try {
                        // 执行异步操作
                        runnable.run();
                    } finally {
                        // 清空MDC内容
                        MDC.clear();
                    }
                }
            });
        }
    }

      用MDCThreadPoolExecutor 代替ThreadPoolExecutor :

    private static final MDCThreadPoolExecutor MDCEXECUTORS=new MDCThreadPoolExecutor(1,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(600), new CustomThreadFactory("mdcThreadPoolTest"), new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    // 打印日志,并且重启一个线程执行被拒绝的任务
                    LOGGER.error("Task:{},rejected from:{}", r.toString(), executor.toString());
                    // 直接执行被拒绝的任务,JVM另起线程执行
                    r.run();
                }
            });
    
            LOGGER.info("父线程日志");
            MDCEXECUTORS.execute(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("子线程日志");
                }
            });

    二、ThreadLocal方式

    ThreadLocal可以用于在同一个线程内,跨类、跨方法传递数据。因此可以用来透传全局上下文

     1、没有子线程的情况

      1)创建线程的请求上下文

    /**
     * 线程上下文,一个线程内所需的上下文变量参数,使用ThreadLocal保存副本
     *
     * @author yangyongjie
     * @date 2019/9/12
     * @desc
     */
    public class ThreadContext {
        /**
         * 每个线程的私有变量,每个线程都有独立的变量副本,所以使用private static final修饰,因为都需要复制进入本地线程
         */
        private static final ThreadLocal<ThreadContext> THREAD_LOCAL = new ThreadLocal<ThreadContext>() {
            @Override
            protected ThreadContext initialValue() {
                return new ThreadContext();
            }
        };
    
        public static ThreadContext currentThreadContext() {
            /*ThreadContext threadContext = THREAD_LOCAL.get();
            if (threadContext == null) {
                THREAD_LOCAL.set(new ThreadContext());
                threadContext = THREAD_LOCAL.get();
            }
            return threadContext;*/
            return THREAD_LOCAL.get();
        }
    
        public static void remove() {
            THREAD_LOCAL.remove();
        }
    
        private String threadId;
    
        public String getThreadId() {
            return threadId;
        }
    
        public void setThreadId(String threadId) {
            this.threadId = threadId;
        }
    
        @Override
        public String toString() {
            return JacksonJsonUtil.toString(this);
        }
    }

      2)使用Aop拦截请求,给每个请求线程ThreadLocalMap添加本地变量ThreadContext 对象并为对象的threadId属性赋值。

    /**
     * 为每一个的HTTP请求添加线程号
     *
     * @author yangyongjie
     * @date 2019/9/2
     * @desc
     */
    @Aspect
    @Component
    public class LogAspect {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(LogAspect.class);
    
        @Pointcut(value = "@annotation(org.springframework.web.bind.annotation.RequestMapping)")
        private void webPointcut() {
            // doNothing
        }
    
        /**
         * 为所有的HTTP请求添加线程号
         *
         * @param joinPoint
         * @throws Throwable
         */
        @Around(value = "webPointcut()")
        public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
            // 方法执行前加上线程号,并将线程号放到线程本地变量中
            ThreadContext.currentThreadContext().setThreadId(StringUtil.uuid());
            // 执行拦截的方法
            Object result;
            try {
                result = joinPoint.proceed();
            } finally {
                // 方法执行结束移除线程号,并移除线程本地变量,防止内存泄漏
                ThreadContext.remove();
            }
            return result;
        }
    }

      3)获取线程号

    String threadId = ThreadContext.currentThreadContext().getThreadId();

     2、请求有子线程的情况

      首先了解一下InheritableThreadLocal<T>,它是对ThreadLocal<T> 的扩展和继承,它的数据ThreadLocal.ThreadLocalMap保存在Thread的inheritableThreadLocals变量中,同时如果我们在当前线程开启一个新线程,而且当前线程存在inheritableThreadLocals变量,那么子线程会copy一份当前线程(父线程)中的这个变量持有的值。

      源码:

      Thread类的inheritableThreadLocals 属性:

     ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

      Thread的构造方法会调用init方法,而init方法的inheritableThreadLocals参数默认为true:

        public Thread() {
            init(null, null, "Thread-" + nextThreadNum(), 0);
        }
    
        public Thread(Runnable target) {
            init(null, target, "Thread-" + nextThreadNum(), 0);
        }
    
         private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize) {
            init(g, target, name, stackSize, null, true);
        }

      那么如果当前线程的inheritableThreadLocals变量不为空,就可以将当前线程的变量继续往下传递给它创建的子线程:

    private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc,
                          boolean inheritThreadLocals) {
            if (name == null) {
                throw new NullPointerException("name cannot be null");
            }
    
            this.name = name;
    
            Thread parent = currentThread();
            SecurityManager security = System.getSecurityManager();
            if (g == null) {
                /* Determine if it's an applet or not */
    
                /* If there is a security manager, ask the security manager
                   what to do. */
                if (security != null) {
                    g = security.getThreadGroup();
                }
    
                /* If the security doesn't have a strong opinion of the matter
                   use the parent thread group. */
                if (g == null) {
                    g = parent.getThreadGroup();
                }
            }
    
            /* checkAccess regardless of whether or not threadgroup is
               explicitly passed in. */
            g.checkAccess();
    
            /*
             * Do we have the required permissions?
             */
            if (security != null) {
                if (isCCLOverridden(getClass())) {
                    security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
                }
            }
    
            g.addUnstarted();
    
            this.group = g;
            this.daemon = parent.isDaemon();
            this.priority = parent.getPriority();
            if (security == null || isCCLOverridden(parent.getClass()))
                this.contextClassLoader = parent.getContextClassLoader();
            else
                this.contextClassLoader = parent.contextClassLoader;
            this.inheritedAccessControlContext =
                    acc != null ? acc : AccessController.getContext();
            this.target = target;
            setPriority(priority);
            if (inheritThreadLocals && parent.inheritableThreadLocals != null)
                this.inheritableThreadLocals =
                    ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
            /* Stash the specified stack size in case the VM cares */
            this.stackSize = stackSize;
    
            /* Set thread ID */
            tid = nextThreadID();
        }

    这里是重点,当inheritThreadLocals 参数为true,且创建此线程的线程即parent(父线程)的inheritableThreadLocals不为空的时候,就会在创建当前线程的时候将父线程的 inheritableThreadLocals 复制到 此新建的子线程。                                    

    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
                this.inheritableThreadLocals =
                    ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

    createInheritedMap()方法就是调用ThreadLocalMap的私有构造方法来产生一个实例对象,把父线程的不为null的线程变量都拷贝过来:

        /**
         * Factory method to create map of inherited thread locals.
         * Designed to be called only from Thread constructor.
         *
         * @param  parentMap the map associated with parent thread
         * @return a map containing the parent's inheritable bindings
         */
        static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
            return new ThreadLocalMap(parentMap);
        }
    
            /**
             * Construct a new map including all Inheritable ThreadLocals
             * from given parent map. Called only by createInheritedMap.
             *
             * @param parentMap the map associated with parent thread.
             */
            private ThreadLocalMap(ThreadLocalMap parentMap) {
                Entry[] parentTable = parentMap.table;
                int len = parentTable.length;
                setThreshold(len);
                table = new Entry[len];
    
                for (int j = 0; j < len; j++) {
                    Entry e = parentTable[j];
                    if (e != null) {
                        @SuppressWarnings("unchecked")
                        ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                        if (key != null) {
                            Object value = key.childValue(e.value);
                            Entry c = new Entry(key, value);
                            int h = key.threadLocalHashCode & (len - 1);
                            while (table[h] != null)
                                h = nextIndex(h, len);
                            table[h] = c;
                            size++;
                        }
                    }
                }
            }

    实际应用代码实现:

    /**
     * 线程上下文,一个线程内所需的上下文变量参数,使用InheritableThreadLocal保存副本,可以将副本传递给子线程
     * @author yangyongjie
     * @date 2020/3/20
     * @desc
     */
    public class InheritableThreadContext {
    
        private static final InheritableThreadLocal<ThreadContext> INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<ThreadContext>() {
            @Override
            protected ThreadContext initialValue() {
                return new ThreadContext();
            }
        };
    
        public static ThreadContext currentThreadContext() {
            /*ThreadContext threadContext = INHERITABLE_THREAD_LOCAL.get();
            if (threadContext == null) {
                INHERITABLE_THREAD_LOCAL.set(new ThreadContext());
                threadContext = INHERITABLE_THREAD_LOCAL.get();
            }
            return threadContext;*/
            return INHERITABLE_THREAD_LOCAL.get();
        }
    
        public static void remove() {
            INHERITABLE_THREAD_LOCAL.remove();
        }
    
        private String threadId;
    
        public String getThreadId() {
            return threadId;
        }
    
        public void setThreadId(String threadId) {
            this.threadId = threadId;
        }
    
        @Override
        public String toString() {
            return JacksonJsonUtil.toString(this);
        }
    }

    使用示例:

            String uuid = UUID.randomUUID().toString().replaceAll("-", "");
            // 当前线程的本地变量(为当前线程的inheritableThreadLocals属性赋值)
            InheritableThreadContext.currentThreadContext().setThreadId(uuid);
            LOGGER.info("[{}]父线程日志",InheritableThreadContext.currentThreadContext().getThreadId());
            // 创建子线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("[{}]子线程日志",InheritableThreadContext.currentThreadContext().getThreadId());
                    // 移除子线程本地变量
                    InheritableThreadContext.remove();
                }
            }).start();
            // 线程池中的线程也是线程工厂通过new Thread创建的
            EXECUTORS.execute(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("[{}]线程池子线程日志",InheritableThreadContext.currentThreadContext().getThreadId());
                    // 移除子线程本地变量
                    InheritableThreadContext.remove();
                }
            });
            // 移除父线程本地变量
            InheritableThreadContext.remove();

    运行结果:

    [INFO ]2020-03-20 13:53:24.056[main] - [4eb5063b1dd84448807cb94f7d4df87a]父线程日志
    [INFO ]2020-03-20 13:53:26.133[Thread-8] - [4eb5063b1dd84448807cb94f7d4df87a]子线程日志
    [INFO ]2020-03-20 13:53:26.137[From CustomThreadFactory-inheritable-worker-1] - [4eb5063b1dd84448807cb94f7d4df87a]线程池子线程日志

    不使用InheritableThreadLocal而使用ThreadLocal的结果(将上面使用示例中的InheritableThreadContext替换为ThreadContext):

    [INFO ]2020-03-20 13:57:54.592[main] - [926f164b97914d29a3638c945c125d44]父线程日志
    [INFO ]2020-03-20 13:57:56.214[Thread-8] - [null]子线程日志
    [INFO ]2020-03-20 13:57:56.215[From CustomThreadFactory-worker-1] - [null]线程池子线程日志

    注意:使用ThreadLocal和InheritableThreadLocal透传上下文时,需要注意线程间切换、异常传输时的处理,避免在传输过程中因处理不当而导致的上下文丢失。

    Spring中对InheritableThreadLocal的使用

      Spring中对一个request范围的数据进行传递时,如当有请求到来需要将 HttpServletRequest的值进行传递,用的就是InheritableThreadLocal。

      Spring的RequestContextHolder部分源码:

    public abstract class RequestContextHolder  {
    
        private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
                new NamedThreadLocal<RequestAttributes>("Request attributes");
    
        private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
                new NamedInheritableThreadLocal<RequestAttributes>("Request context");
    
    
        /**
         * Reset the RequestAttributes for the current thread.
         */
        public static void resetRequestAttributes() {
            requestAttributesHolder.remove();
            inheritableRequestAttributesHolder.remove();
        }
    
        /**
         * Bind the given RequestAttributes to the current thread,
         * <i>not</i> exposing it as inheritable for child threads.
         * @param attributes the RequestAttributes to expose
         * @see #setRequestAttributes(RequestAttributes, boolean)
         */
        public static void setRequestAttributes(RequestAttributes attributes) {
            setRequestAttributes(attributes, false);
        }
    
        /**
         * Bind the given RequestAttributes to the current thread.
         * @param attributes the RequestAttributes to expose,
         * or {@code null} to reset the thread-bound context
         * @param inheritable whether to expose the RequestAttributes as inheritable
         * for child threads (using an {@link InheritableThreadLocal})
         */
        public static void setRequestAttributes(RequestAttributes attributes, boolean inheritable) {
            if (attributes == null) {
                resetRequestAttributes();
            }
            else {
                if (inheritable) {
                    inheritableRequestAttributesHolder.set(attributes);
                    requestAttributesHolder.remove();
                }
                else {
                    requestAttributesHolder.set(attributes);
                    inheritableRequestAttributesHolder.remove();
                }
            }
        }
    
        /**
         * Return the RequestAttributes currently bound to the thread.
         * @return the RequestAttributes currently bound to the thread,
         * or {@code null} if none bound
         */
        public static RequestAttributes getRequestAttributes() {
            RequestAttributes attributes = requestAttributesHolder.get();
            if (attributes == null) {
                attributes = inheritableRequestAttributesHolder.get();
            }
            return attributes;
        }
    }

    NamedInheritableThreadLocal继承了InheritableThreadLocal ,添加了一个name属性:

    public class NamedInheritableThreadLocal<T> extends InheritableThreadLocal<T> {
    
        private final String name;
    
    
        /**
         * Create a new NamedInheritableThreadLocal with the given name.
         * @param name a descriptive name for this ThreadLocal
         */
        public NamedInheritableThreadLocal(String name) {
            Assert.hasText(name, "Name must not be empty");
            this.name = name;
        }
    
        @Override
        public String toString() {
            return this.name;
        }
    
    }

    从RequestContextListener可以看到当请求事件到来时,需要将HttpServletRequest的值通过RequestContextHolder进行传递:

    public class RequestContextListener implements ServletRequestListener {
        ...
        public void requestInitialized(ServletRequestEvent requestEvent) {
            if (!(requestEvent.getServletRequest() instanceof HttpServletRequest)) {
                throw new IllegalArgumentException(
                        "Request is not an HttpServletRequest: " + requestEvent.getServletRequest());
            }
            HttpServletRequest request = (HttpServletRequest) requestEvent.getServletRequest();
            ServletRequestAttributes attributes = new ServletRequestAttributes(request);
            request.setAttribute(REQUEST_ATTRIBUTES_ATTRIBUTE, attributes);
            LocaleContextHolder.setLocale(request.getLocale());
            RequestContextHolder.setRequestAttributes(attributes);
        }
        ...
    }

     总结

      1)若是日志线程号需要通过线程上下文传递时,使用MDC加日志文件配置的方式;有子线程时,使用MDCRunnable装饰Runnable

      2)若是用户信息需要随着请求上下文传递时,使用ThreadLocal的方式;有子线程时,使用InheritableThreadLocal

      3)若是上面两个信息都需要传递,那就结合一起使用。日志使用MDC,其他信息使用ThreadLocal。

    END

  • 相关阅读:
    .NET反编译工具:de4dot
    Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
    hive优化之——控制hive任务中的map数和reduce数
    Spark:将RDD[List[String,List[Person]]]中的List[Person]通过spark api保存为hdfs文件时一直出现not serializable task,没办法找到"spark自定义Kryo序列化输入输出API"
    spark算子:combineByKey
    spark分区数,task数目,core数,worker节点个数,excutor数量梳理
    spark算子:partitionBy对数据进行分区
    算子:sample(false, 0.1)抽样数据
    hive:默认允许动态分区个数为100,超出抛出异常:
    Oracle ADDM性能诊断利器及报告解读
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/12523567.html
Copyright © 2011-2022 走看看