zoukankan      html  css  js  c++  java
  • Hystrix支持ThreadLocal设置传递

    Hystrix使用命令模式将所有对外部服务的调用包装在HystrixCommandHystrixObservableCommand对象中,并将该对象放在单独的线程中执行。因为调用在单独的线程中执行,因此原线程的ThreadLocal设置就失效了。

    因此,为了将当前线程的ThreadLocal数值传递至Hystrix的线程中,可以参考下文。

    封装Callable任务

    public final class DelegatingUserContextCallable<V> implements Callable<V> {
        private final Callable<V> delegate;
        // 用户信息上下文(根据项目实际情况定义ThreadLocal上下文)
        private UserContext originalUserContext;
    
        public DelegatingUserContextCallable(Callable<V> delegate,
                                             UserContext userContext) {
            this.delegate = delegate;
            this.originalUserContext = userContext;
        }
    
        public V call() throws Exception {
            // 将当前的用户上下文设置进Hystrix线程的TreadLocal中
            UserContextHolder.setContext(originalUserContext);
            try {
                return delegate.call();
            }
            finally {
                // 执行完毕,记得清理ThreadLocal资源
                this.originalUserContext = null;
            }
        }
    
        public static <V> Callable<V> create(Callable<V> delegate,
                                             UserContext userContext) {
            return new DelegatingUserContextCallable<V>(delegate, userContext);
        }
    }
    

    实现Hystrix的并发策略类

    public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {
        
        // 最简单的方式就是引入现有的并发策略,进行功能扩展
        private final HystrixConcurrencyStrategy existingConcurrencyStrategy;
    
        public ThreadLocalAwareStrategy(
                HystrixConcurrencyStrategy existingConcurrencyStrategy) {
            this.existingConcurrencyStrategy = existingConcurrencyStrategy;
        }
    
        @Override
        public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
            return existingConcurrencyStrategy != null
                    ? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
                    : super.getBlockingQueue(maxQueueSize);
        }
    
        @Override
        public <T> HystrixRequestVariable<T> getRequestVariable(
                HystrixRequestVariableLifecycle<T> rv) {
            return existingConcurrencyStrategy != null
                    ? existingConcurrencyStrategy.getRequestVariable(rv)
                    : super.getRequestVariable(rv);
        }
    
        @Override
        public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                                HystrixProperty<Integer> corePoolSize,
                                                HystrixProperty<Integer> maximumPoolSize,
                                                HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
                                                BlockingQueue<Runnable> workQueue) {
            return existingConcurrencyStrategy != null
                    ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
                    maximumPoolSize, keepAliveTime, unit, workQueue)
                    : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                    keepAliveTime, unit, workQueue);
        }
    
        @Override
        public <T> Callable<T> wrapCallable(Callable<T> callable) {
            return existingConcurrencyStrategy != null
                    ? existingConcurrencyStrategy
                    .wrapCallable(new DelegatingUserContextCallable<>(callable, UserContextHolder.getContext()))
                    : super.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContextHolder.getContext()));
        }
    }
    

    Hystrix注入新并发策略并进行刷新

    @Configuration
    public class ThreadLocalConfiguration {
    
        @Autowired(required = false)
        private HystrixConcurrencyStrategy existingConcurrencyStrategy;
    
        @PostConstruct
        public void init() {
            // Keeps references of existing Hystrix plugins.
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
                    .getCommandExecutionHook();
    
            HystrixPlugins.reset();
    
            HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
            HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
        }
    }
    

    添加拦截器

    Hystrix经常与OpenFeign搭配使用,此时如果想利用到ThreadLocal中变量,可以给OpenFeign实例添加拦截器,具体代码可自行搜索。

  • 相关阅读:
    tmp
    【ask】ghost分区还原win7出现蓝屏,试图加载CLASSPNP驱动时出现
    手动编译svn
    【ask】Recursive process.nextTick detected. This will break in the next version of node. Please use setImmediate for recursive deferral.
    c++11小计
    入门系列-ABP CLI
    入门系列-参数验证集成
    入门系列-异常处理
    .NET Core 控制台启动失败“以一种访问权限不允许的方式做了一个访问套接字的尝试”
    入门系列-虚拟文件系统
  • 原文地址:https://www.cnblogs.com/jason1990/p/13710858.html
Copyright © 2011-2022 走看看