  • hystrix 源码分析以及属性的配置





    package com.yang.xiao.hui.order.controller;
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.context.annotation.Primary;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    @FeignClient(name ="product",fallback =ProductHystrix.class)
    public interface ProductService {
        Product getProductInfo(@PathVariable("id") Integer id);

    public class ProductHystrix implements ProductService{ @Override public Product getProductInfo(Integer id) { System.out.println("被熔断;了"); Product product = new Product(); product.setName("熔断了。。。。。"); return product; } }




    根据FeignClientsConfiguration这个配置类的一个方法feignHystrixBuilder(),可以知道,要让feign跟hystrix结合,需要在application.yml配置一个feign.hystrix.enabled =true的属性

     1.2 debug调试feignClient客户端的创建:



    public class OrderContorller {
        private ProductService productService;
        public Product getProductInfo(@PathVariable Integer id){
            return productService.getProductInfo(id);









     1.3  debug调试feign调用第三方接口:



     可见,最终是进入了HystrixInvocationHandler类的invoke方法: 方法具体如下:

      public Object invoke(final Object proxy, final Method method, final Object[] args)
          throws Throwable {
        // early exit if the invoked method is from java.lang.Object
        // code is the same as ReflectiveFeign.FeignInvocationHandler
        if ("equals".equals(method.getName())) {
          try {
            Object otherHandler =
                args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
            return equals(otherHandler);
          } catch (IllegalArgumentException e) {
            return false;
        } else if ("hashCode".equals(method.getName())) {
          return hashCode();
        } else if ("toString".equals(method.getName())) {
          return toString();
    //这里创建一个command命令对象,里面实现了2个方法,一个是执行我们目标方法的run()方法,一个是执行目标方法失败后,调用我们的服务降级方法getFallcack() HystrixCommand
    <Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } @Override protected Object getFallback() { if (fallbackFactory == null) { return super.getFallback(); } try { Object fallback = fallbackFactory.create(getExecutionException()); Object result = fallbackMethodMap.get(method).invoke(fallback, args); if (isReturnsHystrixCommand(method)) { return ((HystrixCommand) result).execute(); } else if (isReturnsObservable(method)) { // Create a cold Observable return ((Observable) result).toBlocking().first(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return ((Single) result).toObservable().toBlocking().first(); } else if (isReturnsCompletable(method)) { ((Completable) result).await(); return null; } else if (isReturnsCompletableFuture(method)) { return ((Future) result).get(); } else { return result; } } catch (IllegalAccessException e) { // shouldn't happen as method is public due to being an interface throw new AssertionError(e); } catch (InvocationTargetException | ExecutionException e) { // Exceptions on fallback are tossed by Hystrix throw new AssertionError(e.getCause()); } catch (InterruptedException e) { // Exceptions on fallback are tossed by Hystrix Thread.currentThread().interrupt(); throw new AssertionError(e.getCause()); } } }; if (Util.isDefault(method)) { return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { return hystrixCommand; } else if (isReturnsObservable(method)) { // Create a cold Observable return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } else if (isReturnsCompletableFuture(method)) { return new ObservableCompletableFuture<>(hystrixCommand); } return hystrixCommand.execute(); //最终调用了命令行的execute方法 }

    小结:这里创建一个command命令对象,里面实现了2个方法,一个是执行我们目标方法的run()方法,一个是执行目标方法失败后,调用我们的服务降级方法getFallcack(),直接调用了命令对象的execute方法:1.4 我们跟进命令对象的创建看看有哪些属性:


      这里一大堆初始化属性,我们主要关注2个:HystrixCommandProperties.Setter 和 HystrixThreadPoolProperties.Setter,以及线程池的初始化

      我们看到线程池的初始化,有个threadPoolkey,该值为服务的名称,也就是@FeignClient注解的name 值,初始化时,同时传入了一个线程池的默认配置:










    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            // mark that we're starting execution on the ExecutionHook
            // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
            /* determine if we're allowed to execute */
            if (circuitBreaker.allowRequest()) { //这里判断是否允许执行目标方法,如果不允许,那就执行fallbcak方法
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                    public void call(Throwable t) {
                        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        return executeCommandAndObserve(_cmd)//这里是执行目标方法
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                } else {
                    return handleSemaphoreRejectionViaFallback();//处理fallback方法
            } else {
                return handleShortCircuitViaFallback();//处理fallback方法

     重点三个方法:circuitBreaker.allowRequest()   /executeCommandAndObserve(_cmd)  / handleSemaphoreRejectionViaFallback()



            public boolean isOpen() {
                if (circuitOpen.get()) { //如果已经是打开的,那就返回true
                    // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
                    return true;
                // we're closed, so let's see if errors have made us so we should trip the circuit open
                HealthCounts health = metrics.getHealthCounts(); //获取统计对象,每个command对象的key值就会有一个统计对象,也就是每个请求方法对应一个对象
                // check if we are past the statisticalWindowVolumeThreshold
                if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { //在一个时间窗口内,默认10s,请求总次数要达到配置的最小请求数(默认20次)
                    // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
                    return false;
                if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { /在一个时间窗口内,默认10s,请求失败率要达到最小配置百分比(默认50%)
                    return false;
                } else {
                    // our failure rate is too high, trip the circuit //执行到这里说明失败率过高
                    if (circuitOpen.compareAndSet(false, true)) {
                        // if the previousValue was false then we want to set the currentTime //即使之前是关闭状态也要将其改为打开状态
                        return true;
                    } else {
                        // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
                        // caused another thread to set it to true already even though we were in the process of doing the same
                        // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
                        return true;



    public boolean allowSingleTest() {
                long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); //上次最近打开熔断器的时间
                // 1) if the circuit is open
                // 2) and it's been longer than 'sleepWindow' since we opened the circuit
               if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { 
                    // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                    // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                    if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { //更新最近打开时间,方便下一个5s进行判断
                        // if this returns true that means we set the time so we'll return true to allow the singleTest
                        // if it returned false it means another thread raced us and allowed the singleTest before we did
                        return true;
                return false;







    接着我们分析执行目标方法失败后,是如何调用getFallback方法的: handleSemaphoreRejectionViaFallback()





      1.4  如何修改默认的参数配置:如,我想更改线程池的配置,或者滑动窗口时间,@FeignClient注解里面并没有更改hystrix的相关配置,那么我们回顾下ProductService接口的创建过程:




    package com.yang.xiao.hui.order.controller;
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandKey;
    import com.netflix.hystrix.HystrixThreadPoolProperties;
    import feign.Feign;
    import feign.Target;
    import feign.hystrix.HystrixFeign;
    import feign.hystrix.SetterFactory;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    import java.lang.reflect.Method;
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
    public class FeignConfig {
        @ConditionalOnProperty(name = "feign.hystrix.enabled")
        public Feign.Builder feignHystrixBuilder() {
            HystrixFeign.Builder builder = HystrixFeign.builder();
            SetterFactory setterFactory= new SetterFactory(){
                public HystrixCommand.Setter create(Target<?> target, Method method) {
                    String groupKey = target.name();
                    String commandKey = Feign.configKey(target.type(), method);
                    return HystrixCommand.Setter
            return builder;



    思考: 上面的配置是对所有的服务都起效的,如果我们需要对不同的服务,如库存服务,客户服务,等有不同的配置,如何处理:





    package com.yang.xiao.hui.order.controller;
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandKey;
    import com.netflix.hystrix.HystrixThreadPoolProperties;
    import feign.Feign;
    import feign.Target;
    import feign.hystrix.HystrixFeign;
    import feign.hystrix.SetterFactory;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    import java.lang.reflect.Method;
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
    public class FeignConfig {
        @ConditionalOnProperty(name = "feign.hystrix.enabled")
        public Feign.Builder feignHystrixBuilder() {
            HystrixFeign.Builder builder = HystrixFeign.builder();
            SetterFactory setterFactory= new SetterFactory(){
                public HystrixCommand.Setter create(Target<?> target, Method method) {
                    String groupKey = target.name();
                    String commandKey = Feign.configKey(target.type(), method);
                    HystrixThreadPoolProperties.Setter setter = HystrixThreadPoolProperties.Setter().withCoreSize(100).withMaxQueueSize(200);
                    if("product".equals(groupKey)){ //如果是商品服务就用下面配置
                    return HystrixCommand.Setter
            return builder;



    2.1 添加依赖:


    2.2 修改controller

     2.3.主启动类添加@EnableHystrix或@EnableCircuitBreaker  @EnableHystrix注解里面包含了@EnableCircuitBreaker所以他们功能一致

     2.4 源码分析:


















    总结: Hystix 可以实现熔断,服务降级,限流三大功能;


         2. 服务降级的理解: 就是服务熔断后,执行fallback方法,这就是服务降级了

        3. 限流的理解:就是里面的线程池最大队列数







