zoukankan      html  css  js  c++  java
  • spring-cloud-netflix-hystrix 原理源码分析

    spring-cloud-netflix-hystrix 原理源码分析:

      本文主要针对 spring-cloud-dependencies   Hoxton.SR4版本, spring-cloud-starter-netflix-hystrix 源码的解析。

      对于未接触过 hystrix 的小伙伴可以参考 https://www.cnblogs.com/wuzhenzhao/p/9473073.html 进行一些基础知识的了解。

      本文主要从以下几个点来分析:

    1. 手写实现简易版 Hystrix 体验。
    2. RXJava 基础知识体验。
    3. Hystrix 源码流程分析。

    手写实现简易版 Hystrix 体验:

      继上文的博客链接,我们知道了Hystrix 提供了多种方式让我们实现服务降级。我们可以通过注解 @HystrixCommand、或者继承 HystrixCommand 来实现降级,以及一些请求合并等操作。

      我们需要知道的是,当我们采用 @HystrixCommand 注解来实现服务降级,在Hystrix 的内部是采用 AOP 的方式进行拦截处理请求的,我们这里就先来实现一下简易版的 Hystrix 来体会一下,主要分为以下步骤

    1. 定义自己的@HystrixCommand 注解。
    2. 实现拦截请求的处理逻辑。
    3. 测试调用。

    1.自定义注解 @WuzzHystrixCommand

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface WuzzHystrixCommand {
    
        /**
         * 默认超时时间
         *
         * @return
         */
        int timeout() default 1000;
    
        /**
         * 回退方法
         *
         * @return
         */
        String fallback() default "";
    
    }

    2.编写切面类,实现简易的逻辑处理

    @Component
    @Aspect
    public class WuzzHystrixCommandAspect {
        //线程池的处理,基于这个线程池的处理统计可以达到 THREAD 资源限流
        ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        //注解切点
        @Pointcut(value = "@annotation(com.wuzz.demo.custom.hystrix.WuzzHystrixCommand)")
        public void pointCut() {
        }
    
        //环绕通知
        @Around(value = "pointCut()&&@annotation(hystrixCommand)")
        public Object doPointCut(ProceedingJoinPoint joinPoint, WuzzHystrixCommand hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
            int timeout = hystrixCommand.timeout();
            //前置的判断逻辑
            Future future = executorService.submit(() -> {
                try {
                    return joinPoint.proceed(); //执行目标方法
                } catch (Throwable throwable) {
                    throwable.printStackTrace();
                }
                return null;
            });
            Object result;
            try {// 使用 future 来实现超时
                result = future.get(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
                future.cancel(true);
                //
                if (StringUtils.isBlank(hystrixCommand.fallback())) {
                    throw e;
                }
                //调用fallback
                result = invokeFallback(joinPoint, hystrixCommand.fallback());
            }
            return result;
        }
    
        private Object invokeFallback(ProceedingJoinPoint joinPoint, String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
            //获取被代理的方法的参数和Method
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            Method method = signature.getMethod();
            Class<?>[] parameterTypes = method.getParameterTypes();
            //得到fallback方法
            try {
                Method fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallback, parameterTypes);
                fallbackMethod.setAccessible(true);
                //完成反射调用
                return fallbackMethod.invoke(joinPoint.getTarget(), joinPoint.getArgs());
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        }
    }

    3. 编写测试,调用:

    @WuzzHystrixCommand(fallback = "customFallback", timeout = 3000)
        @GetMapping("/custom/hystrix/test")
        public String test() {
            Map map = new HashMap<>();
            map.put("id", 666);
            return restTemplate.getForObject(REST_URL_PREFIX + "/hello?id={id}", String.class, map);
    }
    
    public String customFallback() {
            return "custom 请求被降级";
    }

      正常得调用是没有问题的,这个时候我们把服务提供方的服务接口里  sleep 3秒来模仿调用超时,在访问接口:

      相信小伙伴们有了一些心得了,只不过Hystrix里面得实现是很复杂的 ,没有我们这么简单。

    RXJava 基础知识体验:

      上文的博文连接中也讲到了 RXJava的简单例子,这里由于马上我们要去看 Hystrix的源码了,我们这里写一个类似于源码中的例子,这样来帮助我们更容易理解。

    public class RxJavaDemo {
    
        // ReactiveX Java  响应式编程框架(android)
        // Java stream() java8
        //观察者模式
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            final String[] datas = new String[]{"登录"};
    
            final Action0 onComplated = new Action0() {
                @Override
                public void call() {
                    System.out.println("on Complated");
                }
            };
            //老师(被观察者)
            Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
                @Override
                public Observable<String> call() {
                    Observable observable1 = Observable.from(datas);
                    return observable1.doOnCompleted(onComplated);
                }
            });
            //学生(观察者)
            Observer observer = new Observer() {
                @Override
                public void onCompleted() {
                    System.out.println("Observer: onCompleted");
                }
    
                @Override
                public void onError(Throwable throwable) {
                    System.out.println("Observer: onError");
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("on Next:" + o);
                }
            };
    //        observable.subscribe(observer); //建立订阅关系
    
            String s = observable.toBlocking().toFuture().get();//建立订阅关系
            System.out.println(s);
        }
    }

      写这个例子的目的主要是想说明,我们可能对于 RXJava 的 API可能不是很熟悉,但是我们一定要知道对于 Observable 实例来说, call 方法才是关键,而 observable.toBlocking().toFuture().get() 是用于获取执行结果的。在 Hystrix的源码中能看到。了解一下,我们直接进入Hystrix的源码

    Hystrix 源码流程分析:

      需要注意的是,Hystrix用到了RxJava这个框架,它是一个响应式编程框架,在Android里面用得比较多,所以很多同学对它不是很了解。如果不了解的话,看Hystrix的源码就会有点困难。

      Hystrix的数据统计是采用的滑动窗口,关于滑动窗口我这里就不深入研究了,又兴趣的同学可以参考我另外一篇博客, Sentinel 限流原理 进行了解,也可以直接访问 滑动窗口在线演示地址

      Hystrix熔断的@HystrixCommand注解,是通过HystrixCommandAspect这个切面来处理的。其中我们关注@Around注解声明的方法,它针对于请求合并,以及降级的注解进行代理。这里我们重点针对HystrixCommand这个注解进行详细分析。

    @Aspect
    public class HystrixCommandAspect {
    
        private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
    
        static {
            META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                    .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                    .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                    .build();
        }
       // 熔断降级切点
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
        public void hystrixCommandAnnotationPointcut() {
        }
       // 请求合并切点
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {
        }
    
       // 环绕通知
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
         // 获取目标方法信息
            Method method = getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                        "annotations at the same time");
            }
            MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
         // 获取元数据,比如调用方法,HystrixProperty注解数据、方法参数等
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
         //获取调用者,它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
            Object result;
            try {
            //是否是响应式的(由于我们这些都是同步的会走这个逻辑)
                if (!metaHolder.isObservable()) {
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = executeObservable(invokable, executionType, metaHolder);
                }
            } catch (HystrixBadRequestException e) {
                throw e.getCause();
            } catch (HystrixRuntimeException e) {
                throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
            }
            return result;
        }
        // ......
    }

      然后进入 CommandExecutor#execute 方法这个方法主要用来执行命令,从代码中可以看出这里有三个执行类型,分别是同步、异步、以及响应式。其中,响应式又分为Cold Observable(observable.toObservable()) 和 HotObservable(observable.observe())默认的executionType=SYNCHRONOUS ,同步请求。

     

    1. execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
    2. queue():异步执行,返回一个 Future 对象,包含着执行结束后返回的单一结果。
    3. observe():这个方法返回一个 Observable 对象,它代表操作的多个结果,但是已经被订阅者消费掉了。
    4. toObservable():这个方法返回一个 Observable 对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

      接着调用HystrixCommand.execute()方法,这个方法中,首先调用queue(),这个方法会返回一个future对象。

    public R execute() {
            try {
                return queue().get();
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
    }

      queue这个方法中,返回了一个Future对象,这个future对象的实现是f,f是以匿名内部类,它是Java.util.concurrent中定一个的一个异步带返回值对象。当调用queue().get()方法时,最终是委派给了delegate.get 方法。

    public Future<R> queue() {
            /*
             * The Future returned by Observable.toBlocking().toFuture() does not implement the
             * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
             * thus, to comply with the contract of Future, we must wrap around it.
             */
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            
            final Future<R> f = new Future<R>() {
    
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    
            // ......省略代码
                @Override
                public R get() throws InterruptedException, ExecutionException {
                    return delegate.get();
                }
    
                @Override
                public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                    return delegate.get(timeout, unit);
                }
                
            };
    
            // ...省略代码
            return f;
    }

      在上述代码中,重点来了,构建了一个 java.util.concurrent.Future ,然后调用 get的时候委派给 delegate,而 delegate来自于 toObservable().toBlocking().toFuture(); 这正是我们上面例子里面得代码。所以我们现在的重点应该放在  toObservable() 方法中:

      AbstractCommand.toObservable :通过Observable定义一个被观察者,这个被观察者会被toObservable().toBlocking().toFuture() ,实际上就是返回可获得 run() 抽象方法执行结果的Future 。 run() 方法由子类实现,执行正常的业务逻辑。在下面这段代码中,当存在subscriber时,便会调用Func0#call() 方法,而这个subscriber是在 toBlocking() 中被订阅的。到这还是我们上面的例子里面的代码。该方法主要做了以下几件事:

    • 创建一些命令供后续的回调使用

    • 调用 isRequestCachingEnabled(); 判断请求结果缓存功能是否开启,如果开启并且命中了缓存,则会以Observable形式返回一个缓存结果

    • 创建执行命令的Observable: hystrixObservable,

    • 当缓存处于开启状态并且没有命中缓存时,则创建一个“订阅了执行命令的Observable”:HystrixCommandResponseFromCache

    • 创建存储到缓存的Observable: HystrixCachedObservable当缓存特性没有开启时,则返回执行命令的Observable。

      • 将toCache添加到缓存中,返回获取缓存的Observable:fromCache

      • 如果添加失败: fromCache!=null, 则调用 toCache.unsubscribe() 方法,取消HystrixCachedObservable 的订阅

      • 如果添加成功,则调用 toCache.toObservable(); 获得缓存Observable

    • 当缓存特性没有开启时,则返回执行命令的Observable。

    public Observable<R> toObservable() {
            final AbstractCommand<R> _cmd = this;
            //会在Observable结束前触发回调该call方法,无论是正常还是异常终止
            //doOnCompleted handler already did all of the SUCCESS work
            //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
            final Action0 terminateCommandCleanup = new Action0() {
    
                @Override
                public void call() {
                    if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                        handleCommandEnd(false); //user code never ran
                    } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                        handleCommandEnd(true); //user code did run
                    }
                }
            };
    
            //取消订阅时的监听会进行回调该 call方法
            //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
            final Action0 unsubscribeCommandCleanup = new Action0() {
                @Override
                public void call() {
                    if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                        if (!_cmd.executionResult.containsTerminalEvent()) {
                            _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                            try {
                                executionHook.onUnsubscribe(_cmd);
                            } catch (Throwable hookEx) {
                                logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                            }
                            _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                    .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                        }
                        handleCommandEnd(false); //user code never ran
                    } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                        if (!_cmd.executionResult.containsTerminalEvent()) {
                            _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                            try {
                                executionHook.onUnsubscribe(_cmd);
                            } catch (Throwable hookEx) {
                                logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                            }
                            _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                    .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                        }
                        handleCommandEnd(true); //user code did run
                    }
                }
            };
    
            final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                        return Observable.never(); // 立即终止整个流程。
                    }//返回执行命令的Observable
                    return applyHystrixSemantics(_cmd);
                }
            };
    
            final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
                @Override
                public R call(R r) {
                    R afterFirstApplication = r;
    
                    try {
                        afterFirstApplication = executionHook.onComplete(_cmd, r);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
                    }
    
                    try {
                        return executionHook.onEmit(_cmd, afterFirstApplication);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                        return afterFirstApplication;
                    }
                }
            };
    
            final Action0 fireOnCompletedHook = new Action0() {
                @Override
                public void call() {
                    try {
                        executionHook.onSuccess(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
                    }
                }
            };
    
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    /* this is a stateful object so can only be used once */
                    // CAS保证命令只执行一次
                    if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                        IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                        //TODO make a new error type for this
                        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                    }
                    // 命令开始时间戳
                    commandStartTimestamp = System.currentTimeMillis();
                    // 打印日志
                    if (properties.requestLogEnabled().get()) {
                        // log this command execution regardless of what happened
                        if (currentRequestLog != null) {
                            currentRequestLog.addExecutedCommand(_cmd);
                        }
                    }
                    // 缓存开关,缓存KEY(这个是Hystrix中请求缓存功能,hystrix支持将一个请求结果缓存起来,
                    // 下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销)
                    final boolean requestCacheEnabled = isRequestCachingEnabled();
                    final String cacheKey = getCacheKey();
    
                    /* try from cache first */
                    if (requestCacheEnabled) {//如果开启了缓存机制,则从缓存中获取结果
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
                    // 声明执行命令的Observable
                    Observable<R> hystrixObservable =
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);
    
                    Observable<R> afterCache;
    
                    // put in cache  保存请求结果到缓存中
                    if (requestCacheEnabled && cacheKey != null) {
                        // wrap it for caching
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        if (fromCache != null) {
                            // another thread beat us so we'll use the cached value instead
                            toCache.unsubscribe();
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        } else {
                            // we just created an ObservableCommand so we cast and return it
                            afterCache = toCache.toObservable();
                        }
                    } else {
                        afterCache = hystrixObservable;
                    }
    
                    return afterCache
                            //会在Observable结束前触发回调,无论是正常还是异常终止
                            .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                            //取消订阅时的监听
                            .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                            //Observable正常终止时的监听
                            .doOnCompleted(fireOnCompletedHook);
                }
            });
        }

      所以在 AbstractCommand#toObservable 方法里,我们只需要看这个返回的 Observable 对象的 call 方法即可,而在这里 默认没有开启缓存的话就是 :

    Observable<R> hystrixObservable =
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);

      那么我们主要来看 applyHystrixSemantics ,在该方法上面定义了

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                        return Observable.never();
                    }
                    return applyHystrixSemantics(_cmd);
                }
    };

      假设缓存特性未开启或者未命中缓存,那么代码将执行 applyHystrixSemantics 。传入的_cmd是一个GenericCommand(可以断点看看),最终执行这个command中的run方法,本质就是完成对queryOrder方法的代理。

      circuitBreaker.allowRequest() 如果为true,表示当前不处于熔断状态,正常执行,否则,调用 handleShortCircuitViaFallback 实现服务降级,如果我们配置了fallback方法,则会获得我们配置的fallback执行。执行路径为 : handleShortCircuitViaFallback ->getFallbackOrThrowException ->getFallbackObservable->HystrixCommand.getFallbackObservable->GenericCommand.getFallback();

      如果当前hystrix处于未熔断状态,则

    • getExecutionSemaphore 判断当前策略是否为信号量(TryableSemaphoreNoOp/TryableSemaphoreActual),如果是,则调用 tryAcquire 来获取信号量。如果当前信号量满了,则调用 handleSemaphoreRejectionViaFallback 方法。
    • 调用 executeCommandAndObserve 获取命令执行Observable。
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            // mark that we're starting execution on the ExecutionHook
            // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
            executionHook.onStart(_cmd);
    
            /* determine if we're allowed to execute */
         // 判断是否处于熔断状态
    
            if (circuitBreaker.allowRequest()) {
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {//操作命令
                    @Override
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                            executionSemaphore.release();
                        }
                    }
                };
    
                final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                    @Override
                    public void call(Throwable t) {
                        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                    }
                };
            // 是否开启信号量资源隔离,未配置走 com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp#tryAcquire 默认返回通过
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        return executeCommandAndObserve(_cmd) // 执行命令,以下三个是回调,可以不看
                                .doOnError(markExceptionThrown)
                                .doOnTerminate(singleSemaphoreRelease)
                                .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                    }
                } else {
               // 走服务降级方法
                    return handleSemaphoreRejectionViaFallback();
                }
            } else {
                return handleShortCircuitViaFallback();
            }
    }

      我们县来看一下执行失败进入降级的逻辑,这里我们直接进入到 HystrixCommand#getFallbackObservable

    @Override
    final protected Observable<R> getFallbackObservable() {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    try {
                        return Observable.just(getFallback());
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
    }

      这里的  getFallback 我们应该熟悉了,因为通过集成 HystrixCommand 类来实现熔断降级的时候我们重写了这个方法,而通过注解的话是通过 GenericCommand进行代理实现得,我们Debug一下,看看该类的 getFallback 方法做了什么:

      可以发现他拿到了我们配置在注解上的方法,这一点是不是跟上文的手写是一个道理呢? 然后进行调用获取结果返回。

      好了,回到 AbstractCommand#applyHystrixSemantics ,接下去我们按照正常逻辑走到  AbstractCommand#executeCommandAndObserve,主要做了以下三件事情

    1. 定义不同的回调,doOnNext、doOnCompleted、onErrorResumeNext、doOnEach。
    2. 调用executeCommandWithSpecifiedIsolation获得执行命令的Observable
    3. 若执行命令超时特性开启,调用 Observable.lift 方法实现执行命令超时功能。
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
         // Action和Func都是定义的一个动作,Action是无返回值,Func是有返回值
    
         // doOnNext中的回调。即命令执行之前执行的操作
            final Action1<R> markEmits = new Action1<R>() {
                @Override
                public void call(R r) {
                    if (shouldOutputOnNextEvents()) {
                        executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                        eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                    }
                    if (commandIsScalar()) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                        eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                        circuitBreaker.markSuccess();
                    }
                }
            };
         // doOnCompleted中的回调。命令执行完毕后执行的操作
            final Action0 markOnCompleted = new Action0() {
                @Override
                public void call() {
                    if (!commandIsScalar()) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                        eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                        circuitBreaker.markSuccess();
                    }
                }
            };
         // onErrorResumeNext中的回调。命令执行失败后的回退逻辑
            final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = getExceptionFromThrowable(t);
                    executionResult = executionResult.setExecutionException(e);
                    if (e instanceof RejectedExecutionException) {
                        return handleThreadPoolRejectionViaFallback(e);
                    } else if (t instanceof HystrixTimeoutException) {
                        return handleTimeoutViaFallback();
                    } else if (t instanceof HystrixBadRequestException) {
                        return handleBadRequestByEmittingError(e);
                    } else {
                        /*
                         * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                         */
                        if (e instanceof HystrixBadRequestException) {
                            eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                            return Observable.error(e);
                        }
    
                        return handleFailureViaFallback(e);
                    }
                }
            };
         // doOnEach中的回调。`Observable`每发射一个数据都会执行这个回调,设置请求上下文
            final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                @Override
                public void call(Notification<? super R> rNotification) {
                    setRequestContextIfNeeded(currentRequestContext);
                }
            };
    
            Observable<R> execution;
         // 是否开启超时降级
            if (properties.executionTimeoutEnabled().get()) {
                execution = executeCommandWithSpecifiedIsolation(_cmd)
                        .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
            } else {
                execution = executeCommandWithSpecifiedIsolation(_cmd);
            }
         // 发射
            return execution.doOnNext(markEmits)
                    .doOnCompleted(markOnCompleted)
                    .onErrorResumeNext(handleFallback)
                    .doOnEach(setRequestContext);
    }

      executeCommandWithSpecifiedIsolation:这个方法首先是根据当前不同的资源隔离策略执行不同的逻辑,THREAD、SEMAPHORE:

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
         // 是否开启 THREAD 资源隔离降级
            if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
                // ......省略代码
            } else { // 否则进入这里
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                        executionResult = executionResult.setExecutionOccurred();
                        if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                            return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                        }
    
                        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                        // semaphore isolated
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        try {
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                    // 真正的执行
                            return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                        } catch (Throwable ex) {
                            //If the above hooks throw, then use that as the result of the run method
                            return Observable.error(ex);
                        }
                    }
                });
            }
    }

      这里就不展开实现细节,我们直接看执行的方法 getUserExecutionObservable 。然后会执行 HystrixCommand#getExecutionObservable

    @Override
        final protected Observable<R> getExecutionObservable() {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    try {
                        return Observable.just(run());
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    // Save thread on which we get subscribed so that we can interrupt it later if needed
                    executionThread.set(Thread.currentThread());
                }
            });
        }

      又看到熟悉的代码   ,这个 run() 方法在通过集成 HystrixCommand 类来实现熔断降级的时候我们重写了这个方法,是真正的执行方法。

      这里最终调用的是run方法,通过Observable.just, just是RxJava中的一个操作符,它可以接受一个或者多个参数来创建一个Observable对象。而这个run()方法是一个抽象方法,在HystrixCommand中并没有实现,而是在子类中实现,而此时传递的cmd=GenricCommand正好实现了HystrixCommand,重写了run方法。

    @Override
        protected Object run() throws Exception {
            LOGGER.debug("execute command: {}", getCommandKey().name());
            return process(new Action() {
                @Override
                Object execute() {
                    return getCommandAction().execute(getExecutionType());
                }
            });
    }

      大家有没有发现,这里的实现和我们前面自定义的 HystrixCommandService 实现是一样的,同样是集成HystrixCommand,重写run方法。这里也是如此。

    • 首先调用 getCommandAction() 方法获取 CommandAction ,我们的示例中获取到的是MethodExecutionAction 。
    • 然后调用 MethodExecutionAction.execute 方法,传入 ExecutionType 参数,我们的示例中传入的是 ExecutionType.SYNCHRONOUS 。

      拿到我们的真实方法进行调用返回。下面附上整个过程的流程图:

  • 相关阅读:
    mysql分组统计后将结果顺序排列(union实现)
    mysql格式化日期
    yaf框架安装
    如何通过PHP将excel的数据导入MySQL中
    yii日志保存机制
    安装PyInstaller打包python
    python正则表达式详解
    Python中类的定义与使用
    例子 使用sqlite3 数据库建立数据方式
    python操作轻量级数据库
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/13726372.html
Copyright © 2011-2022 走看看