zoukankan      html  css  js  c++  java
  • 使用Hystrix的插件机制,解决在使用线程隔离时,threadlocal的传递问题

    背景

    在我们的项目中,比较广泛地使用了ThreadLocal,比如,在filter层,根据token,取到用户信息后,就会放到一个ThreadLocal变量中;在后续的业务处理中,就会直接从当前线程,来获取该ThreadLocal变量,然后获取到其中的用户信息,非常的方便。

    但是,hystrix 这个组件一旦引入的话,如果使用线程隔离的方式,我们的业务逻辑就被分成了两部分,如下:

    public class SimpleHystrixCommand extends HystrixCommand<String> {
    
    	private TestService testService;
    
    	public SimpleHystrixCommand(TestService testService) {
    		super(setter());
    		this.testService = testService;
        }
        @Override
    	protected String run() throws Exception {
        	....
        }
        ...
    }
    

    首先,我们定义了一个Command,这个Command,最终就会丢给hystrix的线程池中去运行。那,我们的controller层,会怎么写呢?

        @RequestMapping("/")
        public String hystrixOrder () {
            SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();
            // 1
            SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);
            // 2
            String res = simpleHystrixCommand.execute();
            return res;
        }
    
    • 上面的1处,new了一个HystrixCommand,这一步,还是在当前线程执行的;
    • 2处,在执行execute的过程中,最终就会把这个command,丢到线程池中,然后,command中的业务逻辑,就在线程池的线程中执行了。

    所以,这中间,是有线程切换的,执行1时,当前线程里的ThreadLocal数据,在执行业务方法的时候,线程变了,也就取不到ThreadLocal数据了。

    思路及实现

    源码

    如果没时间,可以直接看源码:

    https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo

    从setter入手

    一开始,我的思路是,看看能不能把hystrix的默认线程池给换掉,因为构建HystrixCommand时,支持使用Setter的方式去配置。

    如下:

    com.netflix.hystrix.HystrixCommand.Setter    
    final public static class Setter {
    		// 1
            protected final HystrixCommandGroupKey groupKey;
            // 2
            protected HystrixCommandKey commandKey;
            // 3
            protected HystrixThreadPoolKey threadPoolKey;
            // 4
            protected HystrixCommandProperties.Setter commandPropertiesDefaults;
            // 5
            protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults;
            
        }
    
    • 1处,设置命令组

    • 2处,设置命令的key

    • 3处,设置线程池的key;hystrix会根据这个key,在一个map中,来查找对应的线程池,如果找不到,则创建一个,并放到map中。

      com.netflix.hystrix.HystrixThreadPool.Factory
          
      final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
      
      
    • 4处,命令的相关属性,包括是否降级,是否熔断,是否允许请求合并,命令执行的最大超时时长,以及metric等实时统计信息

    • 5处,线程池的相关属性,比如核心线程数,最大线程数,队列长度等

    怎么样,可以设置的属性很多,是吧,但是,并没有让我们控制线程池的创建相关的,也没办法替换其默认线程池。

    ok,那不用setter的方式,行不行呢?

    从构造器入手

    HystrixCommand 的构造函数,看看能不能传入自定义的线程池呢?

    经过我一开始不仔细的观察,发现有一个构造函数可以传入HystrixThreadPool,ok,就是它了。但是,后面仔细一看,竟然是 package权限,我的子类,和HystrixCommand当然不是一个package下的,所以,访问不了这个构造器。

    虽然,可以使用反射,但是,咱们还是守规矩点好了,再看看有没有其他入口。

    寻找扩展口

    仔细观察下,看看线程池什么时候创建的?

    入口在下图,每次new一个HystrixCommand,最终都会调用父类的构造函数:

    上图所示处,initThreadPool里面,会去创建线程池,需要注意的是,这里的第一个实参,threadPool,是构造函数的第5个形参,目前来看,传进来的都是null。为啥说这个,我们接着看:

        private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
            if (fromConstructor == null) {
                //1 get the default implementation of HystrixThreadPool
                return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
            } else {
                return fromConstructor;
            }
        }
    

    上面我们说了,第一个实参,总是null,所以,会走这里的1处。

    com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
        
    static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
                String key = threadPoolKey.name();
    
                //1 this should find it for all but the first time
                HystrixThreadPool previouslyCached = threadPools.get(key);
                if (previouslyCached != null) {
                    return previouslyCached;
                }
    
                //2 if we get here this is the first time so we need to initialize
                synchronized (HystrixThreadPool.class) {
                    if (!threadPools.containsKey(key)) {
                        // 3
                        threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                    }
                }
                return threadPools.get(key);
            }
    
    • 1处,会查找缓存,就是前面说的,去map中,根据线程池的key,查找对应的线程池
    • 2处,没找到,则进行创建
    • 3处,new HystrixThreadPoolDefault,创建线程池

    我们接着看3处:

            public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
                // 1
                this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
                // 2
                HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
                // 3
                this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                        concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                        properties);
                // 4
                this.threadPool = this.metrics.getThreadPool();
                ...
            }
    
    • 1处,获取线程池的默认配置,这个就和我们前面说的那个Setter里的类似

    • 2处,从HystrixPlugins.getInstance()获取一个HystrixConcurrencyStrategy类型的对象,保存到局部变量 concurrencyStrategy

    • 3处,初始化metrics,这里的第二个参数,是concurrencyStrategy.getThreadPool来获取的,这个操作,实际上就会去创建线程池。

      com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool
          
      public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
              final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
      		...
              final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
              final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
      
      		...
              // 1 
              return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
              }
          }
      

      上面的1处,会去创建线程池。但是,这里直接就是要了 jdk 的默认线程池类来创建,这还怎么搞?类型都定死了。没法扩展了。。。

    发现hystrix的插件机制

    但是,回过头来,又仔细看了看,这个getThreadPool 是 HystrixConcurrencyStrategy类的一个方法,这个方法也是个实例方法。

    方法不能改,那,实例能换吗?再看看前面的代码:

    ok,那接着分析:

        public HystrixConcurrencyStrategy getConcurrencyStrategy() {
            if (concurrencyStrategy.get() == null) {
                //1 check for an implementation from Archaius first
                Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);
                concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);
            }
            return concurrencyStrategy.get();
        }
    

    1处,根据这个类,获取实现,感觉有点戏。

        private <T> T getPluginImplementation(Class<T> pluginClass) {
            // 1
            T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
            if (p != null) return p;    
            // 2
            return findService(pluginClass, classLoader);
        }
    
    • 1处,从一个动态属性中获取,后来经查,发现是如果集成了Netflix Archaius就可以动态获取属性,类似于一个配置中心

    • 2处,如果前面没找到,就是要 JDK 的SPI机制。

          private static <T> T findService(
                  Class<T> spi, 
                  ClassLoader classLoader) throws ServiceConfigurationError {
              
              ServiceLoader<T> sl = ServiceLoader.load(spi,
                      classLoader);
              for (T s : sl) {
                  if (s != null)
                      return s;
              }
              return null;
          }
      

      那就好说了。SPI ,我们自定义一个实现,就可以替换掉默认的了,hystrix做的还是不错,扩展性可以。

    现在知道可以自定义HystrixConcurrencyStrategy了,那要怎么自定义呢?

    这个类,是个抽象类,大体有如下几个方法:

    getThreadPool
        
    getBlockingQueue(int maxQueueSize) 
        
    Callable<T> wrapCallable(Callable<T> callable)
        
    getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)     
    

    说是抽象类,但其实并没有需要我们实现的方法,所有方法都有默认实现,我们只需要重写需要覆盖的方法即可。

    我这里,看重了第三个方法:

    /**
     * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
     * <p>
     * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Pass-thru that does no wrapping.
     * 
     * @param callable
     *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
     * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
     */
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }
    

    方法注释如上,我简单说下,在执行前,提供一个机会,让你去wrap这个callable,即最终要丢到线程池执行的那个callable。

    我们可以wrap一下原有的callable,在执行前,把当前线程的threadlocal变量存下来,即为A,然后设置到callable里面去;在callable执行的时候,就可以使用我们的A中的threadlocal来替换掉worker线程中的。

    多说无益,这里直接看代码:

    // 0
    public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    
        @Override
        public <T> Callable<T> wrapCallable(Callable<T> callable) {
            /**
             * 1 获取当前线程的threadlocalmap
             */
            Object currentThreadlocalMap = getCurrentThreadlocalMap();
    
            Callable<T> finalCallable = new Callable<T>() {
                // 2
                private Object callerThreadlocalMap = currentThreadlocalMap;
    			// 3
                private Callable<T> targetCallable = callable;
    
                @Override
                public T call() throws Exception {
                    /**
                     * 4 将工作线程的原有线程变量保存起来
                     */
                    Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap();
                    /**
                     *5 将本线程的线程变量,设置为caller的线程变量
                     */
                    setCurrentThreadlocalMap(callerThreadlocalMap);
    
                    try {
                        // 6
                        return targetCallable.call();
                    }finally {
                        // 7
                        setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread);
                        log.info("restore work thread's threadlocal");
                    }
    
                }
            };
    
            return finalCallable;
        }
    
    • 0处,自定义了一个类,继承HystrixConcurrencyStrategy,准备覆盖其默认的wrap方法
    • 1处,获取外部线程的threadlocal
    • 2处,3处,这里已经是处于匿名内部类了,定义了2个field,分别存放1中的外部线程的threadlocal,以及要wrap的callable
    • 4处,此时已经处于run方法的执行逻辑了:保存worker线程的自身的线程局部变量
    • 5处,使用外部线程的threadlocal覆盖自身的
    • 6处,调用真正的业务逻辑
    • 7处,恢复为线程自身的threadlocal

    获取线程的threadlocal的代码:

        private Object getCurrentThreadlocalMap() {
            Thread thread = Thread.currentThread();
            try {
                Field field = Thread.class.getDeclaredField("threadLocals");
                field.setAccessible(true);
                Object o = field.get(thread);
                return o;
            } catch (NoSuchFieldException | IllegalAccessException e) {
                log.error("{}",e);
            }
            return null;
        }
    

    设置线程的threadlocal的代码:

    private void setCurrentThreadlocalMap(Object newThreadLocalMap) {
        Thread thread = Thread.currentThread();
        try {
            Field field = Thread.class.getDeclaredField("threadLocals");
            field.setAccessible(true);
            field.set(thread,newThreadLocalMap);
    
        } catch (NoSuchFieldException | IllegalAccessException e) {
            log.error("{}",e);
        }
    }
    

    插件机制的相关资料

    https://github.com/Netflix/Hystrix/wiki/Plugins

    运行效果

    controller代码

    @RequestMapping("/")
    public String hystrixOrder () {
        // 1
        SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal();
        // 2
        SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService);
        String res = simpleHystrixCommand.execute();
        return res;
    }
    
    • 1处,设置ThreadLocal变量

          public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() {
              UserVO userVO = new UserVO();
              userVO.setUserName("test user");
      
              RequestContextHolder.set(userVO);
              log.info("set thread local:{} to context",userVO);
      
              return userVO;
          }
      
    • 2处,new了一个HystrixCommand,然后execute执行

    command中代码

    public class SimpleHystrixCommand extends HystrixCommand<String> {
    
    	private TestService testService;
    
    	public SimpleHystrixCommand(TestService testService) {
    		super(setter());
    		this.testService = testService;
        }
    
        @Override
    	protected String run() throws Exception {
            // 1
    		String s = testService.getResult();
    		log.info("get thread local:{}",s);
    
    		/**
    		 * 如果睡眠时间,超过2s,会降级
    		 * {@link #getFallback()}
    		 */
    		int millis = new Random().nextInt(3000);
    		log.info("will sleep {} millis",millis);
    		Thread.sleep(millis);
    
    		return s;
    	}
    

    重点看1处代码:

        public String getResult() {
            UserVO userVO = RequestContextHolder.get();
            log.info("I am  hystrix pool thread,try to get threadlocal:{}",userVO);
    
            return userVO.toString();
        }
    

    如上所示,会去获取ThreadLocal变量,并打印。

    spi配置

    在resourcesMETA-INFservices目录下,创建文件:

    com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy

    内容为下面一行:

    com.learn.hystrix.utils.MyHystrixConcurrencyStrategy

    执行效果

    访问:http://localhost:8080/

    2020-05-09 17:26:11.134  INFO 7452 --- [nio-8080-exec-2] com.learn.hystrix.utils.SessionUtils     : set thread local:UserVO(userName=test user) to context
    2020-05-09 17:26:11.143  INFO 7452 --- [x-member-pool-2] com.learn.hystrix.service.TestService    : I am  hystrix pool thread,try to get threadlocal:UserVO(userName=test user)
    2020-05-09 17:26:11.143  INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand       : get thread local:UserVO(userName=test user)
    2020-05-09 17:26:11.144  INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand       : will sleep 126 millis
    2020-05-09 17:26:11.281  INFO 7452 --- [x-member-pool-2] c.l.h.u.MyHystrixConcurrencyStrategy     : restore work thread's threadlocal
    

    可以看到,已经发生了线程切换,在worker线程也取到了。

    大家如果发现日志中出现了[ HystrixTimer-1] 线程的身影,不用担心,那只是因为我们的线程超时了,所以timer线程检测到了之后,去执行一个callable任务,那个runnable就是前面被我们包装过的那个callable。(这块超时的机制,todo吧,下次再讲)

    总结

    hystrix的插件机制,不止可以扩展上面这一个类,还有几个别的类也是可以的。大家直接参考:

    https://github.com/Netflix/Hystrix/wiki/Plugins

    代码demo,我放在了:

    https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo

    可供参考的文章:
    https://www.jianshu.com/p/f30892335057

  • 相关阅读:
    温故vue对vue计算属性computed的分析
    bootStrap Table 如何使用
    css 的一些知识点的整理
    css 宽高自适应的div 元素 如何居中 垂直居中
    BOM,Dom 回顾
    DOM
    字符串的一些常用方法 string
    js if for 详解 获取元素方式 及一些js 基础知识
    Java入门1
    python字符串
  • 原文地址:https://www.cnblogs.com/grey-wolf/p/12859084.html
Copyright © 2011-2022 走看看