zoukankan      html  css  js  c++  java
  • Hystrix源码分析(一)

    Hystrix源码分析(一)

    @HystrixCommand入口源码

    ​ 在方法上加上@HystrixCommand就能让Hystrix起作用,我的想法就是应该是用了aop的技术去监听@HystrixCommand的注解吧。经过一番寻找aop的代码在HystrixCommandAspect这里找到了实现的,代码如下:

    @Aspect
    public class HystrixCommandAspect {
        
         @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
        public void hystrixCommandAnnotationPointcut() {
        }
    
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {
        }
    
        //aop监控的方法
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
            Method method = AopUtils.getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
            } else {
                HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP
                    .get(HystrixCommandAspect.HystrixPointcutType.of(method));
                
                MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
                //构建hystrixCommand的实现类
                HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
                ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() 
                    ?metaHolder.getCollapserExecutionType() 
                    : metaHolder.getExecutionType();
    
                try {
                    Object result;
                    if (!metaHolder.isObservable()) {
                        result = CommandExecutor.execute(invokable, executionType, metaHolder);
                    } else {
                        result = this.executeObservable(invokable, executionType, metaHolder);
                    }
    
                    return result;
                } catch (...) {
                 	...
                } 
            }
        }
    }
    

    ​ HystrixCommandAspect监听了@HystrixCommand@HystrixCollapser的注解,查看methodsAnnotatedWithHystrixCommand 主要的代码其实就是HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder) 这句话,这个代码主要目的是构造一个HystrixCommand,首先这里看一下HystrixInvokable和HystrixCommand的关系图:

    image-20201001112929178

    ​ 可以发现HystrixCommand就是HystrixInvokable的子类,根据我之前写的那个Hystrix入门demo中有一种方式就是通过实现HystrixCommand去进行的。所以这里应该是通过你@HystrixCommand设置的参数给你构造出一个HystrixCommand来,然后执行

    CommandExecutor.execute(invokable, executionType, metaHolder)

    Hystrix入门demo的地址:https://www.cnblogs.com/dabenxiang/p/13676116.html

    构建GenericCommand的过程

    这里我们要查看一下构建一个HystrixCommand的时候主要做了什么事情:

    展示 HystrixCommandFactory.getInstance().create(metaHolder)中的Create代码:

    public class HystrixCommandFactory {
        private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
    
        private HystrixCommandFactory() {
        }
    
        public static HystrixCommandFactory getInstance() {
            return INSTANCE;
        }
    
        public HystrixInvokable create(MetaHolder metaHolder) {
            Object executable;
            //判断是不是HystrixCollapser注解
            if (metaHolder.isCollapserAnnotationPresent()) {
                executable = new CommandCollapser(metaHolder);
            } else if (metaHolder.isObservable()) {
                executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            } else {
                //会执行这个。
                executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            }
    
            return (HystrixInvokable)executable;
        }
     }
    

    ​ 因为我们的注解不是HystrixCollapser且我们的方法不是isObservable,所以我们会构建一个GenericCommand,GenericCommand这个类其实是HystrixCommand的一个子类,这个类的关系图在上面也是有的,所以构造过程是 GenericCommand -> AbstractHystrixCommand -> HystrixCommand -> AbstractCommand, 构建GenericCommand的过程,我们主要还是看AbstractCommand的构造方法把,代码如下:

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    	
        
        //构造方法
        protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, 
                                  HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, 
                                  HystrixThreadPool threadPool,
                                  HystrixCommandProperties.Setter commandPropertiesDefaults, 
                                  HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
                                  HystrixCommandMetrics metrics,
                                  TryableSemaphore fallbackSemaphore, 
                                  TryableSemaphore executionSemaphore,
                                  HystrixPropertiesStrategy propertiesStrategy, 
                                  HystrixCommandExecutionHook executionHook) {
    
            this.commandGroup = initGroupKey(group);
            this.commandKey = initCommandKey(key, getClass());
            this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
            //初始化线程池
            this.threadPoolKey = initThreadPoolKey(threadPoolKey, 
                                                   this.commandGroup, 		
                                                   this.properties.executionIsolationThreadPoolKeyOverride().get());
            this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
            //初始化熔断器
            this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), 
                                                     circuitBreaker, this.commandGroup, 
                                                     this.commandKey, this.properties, this.metrics);
            this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
    
            //Strategies from plugins
            this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, 
                                                                               this.metrics, this.circuitBreaker, 
                                                                               this.properties);
            this.executionHook = initExecutionHook(executionHook);
    
            this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
            this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
    
            /* fallback semaphore override if applicable */
            this.fallbackSemaphoreOverride = fallbackSemaphore;
    
            /* execution semaphore override if applicable */
            this.executionSemaphoreOverride = executionSemaphore;
        }
    }
    
    

    ​ 这里关注的点是: initThreadPool()是初始化线程池和initCircuitBreaker () 初始化熔断器的配置。这里插个眼,先分析线程池初始化,再分析熔断器的初始化。

    initThreadPool线程池的初始化

    initThreadPool的相关代码如下:

    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, 
                                                          HystrixCommandGroupKey groupKey, String 
                                                          threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            //判断存不存在threadPoolKey不存在则使用groupKey.name()
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }
    
       private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor,
                                                       HystrixThreadPoolKey threadPoolKey, 
                                                       HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixThreadPool
                return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
            } else {
                return fromConstructor;
            }
        }
    
    /* package */static class Factory {
        //线城池是利用ConcurrentHashMap来做保存的
    	final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
    
        //通过threadPoolKey获取HystrixThreadPool,如果工厂有就直接返回,没有就创建
      	static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    
            String key = threadPoolKey.name();
            
                HystrixThreadPool previouslyCached = threadPools.get(key);
                if (previouslyCached != null) {
                    return previouslyCached;
                }
    
                synchronized (HystrixThreadPool.class) {
                    if (!threadPools.containsKey(key)) {
                        threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                    }
                }
                return threadPools.get(key);
            }
    }
    

    ​ 上面的代码的功能,有这么三个作用

    • initThreadPoolKey() 方法就是构建一个HystrixThreadPoolKey ,代码意思: 当threadPoolKey存在时使用threadPoolKey,当threadPoolKey不存在时使用groupKey来构建。把构建好的threadPoolKey入参到initThreadPool中。
    • initThreadPool就是:HystrixThreadPool.Factory 通过threadPoolKey 来得到HystrixThreadPool
    • HystrixThreadPool.Factory.getInstance 作用:通过threadPoolKey获取HystrixThreadPool,如果工厂有就直接返回,没有就创建HystrixThreadPoolDefault,这里的 threadPoolKey.name() 等于 threadPoolKey == null ? groupKey : threadPoolKey

    构建HystrixThreadPoolDefault过程如下:

          public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
                this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
                HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
                this.queueSize = properties.maxQueueSize().get();
    
                this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                        concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                        properties);
                this.threadPool = this.metrics.getThreadPool();
                this.queue = this.threadPool.getQueue();
    
                /* strategy: HystrixMetricsPublisherThreadPool */
                HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
            }
    
        public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
            final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    
            final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
            final int dynamicCoreSize = threadPoolProperties.coreSize().get();
            final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
            final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
            final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
    
            //判断允不允许设置最大的线程数
            if (allowMaximumSizeToDivergeFromCoreSize) {
                final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
                if (dynamicCoreSize > dynamicMaximumSize) {
                    return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
                } else {
                    return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
                }
            } else {
                //最大线程数就等于核心数
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        }
    

    ​ 可以看到getThreadPool()这里其实就可以看到 new ThreadPoolExecutor 构建了一个线程池

    总结一下:,通过threadPoolKey和groupKey的逻辑作为key去工厂中取相应的线程池,没有则创建,所以就说如果两个HystrixCommand的threadPoolKey相同时会用同一个线程池,如果不存在threadPoolKey情况下,如果groupKey是相同的话也会用同一个线程池。

    initCircuitBreaker 初始化熔断器:

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                 HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }
    

      public static class Factory {
    
            private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand =
                new ConcurrentHashMap<String, HystrixCircuitBreaker>();
    
            public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, 
                                                            HystrixCommandGroupKey group, HystrixCommandProperties 
                                                            properties, 
                                                            HystrixCommandMetrics metrics) {
               
                HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
                if (previouslyCached != null) {
                    return previouslyCached;
                }
                HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), 
                                        new HystrixCircuitBreakerImpl(key, group, properties, metrics));
                
                if (cbForCommand == null) {
                    // this means the putIfAbsent step just created a new one so let's retrieve and return it
                    return circuitBreakersByCommand.get(key.name());
                } else {
                    return cbForCommand;
                }
            }
     }
    
     /* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
            private final HystrixCommandProperties properties;
            private final HystrixCommandMetrics metrics;
    
            /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
            private AtomicBoolean circuitOpen = new AtomicBoolean(false);
    
            /* when the circuit was marked open or was last allowed to try a 'singleTest' */
            private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
    
            protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, 
                                                HystrixCommandProperties properties, 
                                                HystrixCommandMetrics metrics) {
                this.properties = properties;
                this.metrics = metrics;
            }
           
    }
    

    ​ 熔断器初始化代码上整体结构和初始化线程池的过程差不多,都是通过工厂类里面的ConcurrentHashMap来管理熔断器,且key也是根据HystrixCommandKey来做判断,这里具体初始化细节把握的不太好先过了。因为这个this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); 里面的rxjava搞得不太懂,失败率,请求个数的统计的初始化在这个上面,先过了。到时候回来看把

    结尾

    ​ 这篇博文讲了的事情就是: aop监听@HystrixCommand,然后更具@Hystrix的配置构建了一个GenericCommand这么的一个过程,下一篇博文就是讲述CommandExecutor.execute这个方法里面经历了什么东西了。

    ​ 我自己画的整个Hystrix的流程图:

    hstrix执行流程图

    高清流程图:

    https://gitee.com/gzgyc/blogimage/raw/master/hstrix执行流程图.jpg

  • 相关阅读:
    高精度A+B
    基本定积分求面积
    二进制算子集和
    linux命令
    Dubbo
    java 集合区别
    Java中Comparable和Comparator区别
    synchronized实现原理
    ThreadLocal 原理
    java volatile关键字
  • 原文地址:https://www.cnblogs.com/dabenxiang/p/13764076.html
Copyright © 2011-2022 走看看