zoukankan      html  css  js  c++  java
  • Hystrix工作流程解析

    搭建Hystrix源码阅读环境

    引入依赖

            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-core</artifactId>
                <version>1.5.12</version>
            </dependency>
    

    创建Command

    public class HelloCommand extends HystrixCommand<String> {
    
        public HelloCommand() {
            super(HystrixCommandGroupKey.Factory.asKey("test"));
        }
    
        @Override
        protected String run() throws Exception {
            Thread.sleep(800);
            return "sucess";
        }
    
        @Override
        protected String getFallback() {
            System.out.println("执行了回退方法");
            return "error";
        }
    
    }
    

    创建测试类

    public class CommandTest {
        public static void main(String[] args) {
            HelloCommand command = new HelloCommand();
            String result = command.execute();
            System.out.println(result);
        }
    }
    
    
    Hystrix工作流程

    file

    首先我们看一下上方的这张图,这个图完整的描述了Hystrix的工作流程:
    1.每次调用都会创建一个HystrixCommand
    2.执行execute或queue做同步异步调用
    3.判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4
    4.判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5
    5.调用HystrixCommand的run方法,如果调用超时进入步骤8
    6.判断是否调用成功,返回成功调用结果,如果失败进入步骤8
    7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态
    8.降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:

    1. 熔断器打开
    2. 线程池/信号量跑满
    3. 调用超时
    4. 调用失败

    9.返回执行成功结果

    创建HystrixCommand

    接着我们结合源码看一下这个调用流程,直接执行测试类的main方法,可以看到入口就在execute方法上

        public R execute() {
            try {
                return queue().get();
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
        }
    
    执行同步方法
    public Future<R> queue() {
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            //省略。。。
    };
    

    接着看toObservable()方法

     public Observable<R> toObservable() {
            //省略。。。
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                        IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                        //TODO make a new error type for this
                        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                    }
    
                    commandStartTimestamp = System.currentTimeMillis();
    
                    if (properties.requestLogEnabled().get()) {
                        // log this command execution regardless of what happened
                        if (currentRequestLog != null) {
                            currentRequestLog.addExecutedCommand(_cmd);
                        }
                    }
    
                    final boolean requestCacheEnabled = isRequestCachingEnabled();
                    final String cacheKey = getCacheKey();
                    //如果开启请求缓存则查询缓存是否存在
                    if (requestCacheEnabled) {
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
    
                    Observable<R> hystrixObservable =
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);
    
                    Observable<R> afterCache;
    
                    if (requestCacheEnabled && cacheKey != null) { 
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        if (fromCache != null) {
                            // another thread beat us so we'll use the cached value instead
                            toCache.unsubscribe();
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        } else {
                            // we just created an ObservableCommand so we cast and return it
                            afterCache = toCache.toObservable();
                        }
                    } else {
                        afterCache = hystrixObservable;
                    }
    
                    return afterCache
                            .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                            .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                            .doOnCompleted(fireOnCompletedHook);
                }
            });
        }
    

    在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyHystrixSemantics方法

    判断熔断器和线程池以及信号量
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            executionHook.onStart(_cmd);
    
            /* determine if we're allowed to execute */
            //判断是否开启熔断器
            if (circuitBreaker.attemptExecution()) {
                //获取信号量实例
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {
                    @Override
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                            executionSemaphore.release();
                        }
                    }
                };
    
                final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                    @Override
                    public void call(Throwable t) {
                        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                    }
                };
                //尝试获取信号量
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        return executeCommandAndObserve(_cmd)
                                .doOnError(markExceptionThrown)
                                .doOnTerminate(singleSemaphoreRelease)
                                .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                    }
                } else {
                    //拒绝
                    return handleSemaphoreRejectionViaFallback();
                }
            } else {
                //失败
                return handleShortCircuitViaFallback();
            }
        }
    

    applyHystrixSemantics方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑。否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executeCommandAndObserve方法

    判断超时
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
            //省略...
    
            //判断是否开启超时设置
            if (properties.executionTimeoutEnabled().get()) {
               //list添加超时操作
                execution = executeCommandWithSpecifiedIsolation(_cmd)
                        .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
            } else {
                execution = executeCommandWithSpecifiedIsolation(_cmd);
            }
    

    这里如果设置超时时间的话则会加上一个超时的处理,接着看executeCommandWithSpecifiedIsolation方法

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
            if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                        executionResult = executionResult.setExecutionOccurred();
                        if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                            return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                        }
                        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
    
                        if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                            return Observable.error(new RuntimeException("timed out before executing run()"));
                        }
                        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                            HystrixCounters.incrementGlobalConcurrentThreads();
                            threadPool.markThreadExecution();
                            // store the command that is being run
                            endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                            executionResult = executionResult.setExecutedInThread();
    
                            try {
                                executionHook.onThreadStart(_cmd);
                                executionHook.onRunStart(_cmd);
                                executionHook.onExecutionStart(_cmd);
                                return getUserExecutionObservable(_cmd);
                            } catch (Throwable ex) {
                                return Observable.error(ex);
                            }
                        } else {
                            return Observable.empty();
                        }
                    }
                }).doOnTerminate(new Action0() {
                    @Override
                    public void call() {
                        if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                            handleThreadEnd(_cmd);
                        }
                        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        }
                    }
                }).doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                            handleThreadEnd(_cmd);
                        }
                        if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        }
                    }
                }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                    @Override
                    public Boolean call() {
                        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                    }
                }));
            } else {
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                        executionResult = executionResult.setExecutionOccurred();
                        if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                            return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                        }
    
                        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                        // semaphore isolated
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        try {
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                        } catch (Throwable ex) {
                            //If the above hooks throw, then use that as the result of the run method
                            return Observable.error(ex);
                        }
                    }
                });
            }
        }
    

    这段代码比较长,具体的执行逻辑为:

    1. 进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑
    2. 更改HystrixCommand的状态为USER_CODE_EXECUTED
    3. 判断HystrixCommand的超时状态,如果超时则抛出异常
    4. 更改当前command的线程执行状态为STARTED
    5. 调用getUserExecutionObservable执行具体的业务逻辑,也就是我们实现的那个run方法
    6. doOnTerminate:执行完毕后更改线程状态为TERMINAL
    7. doOnUnsubscribe:当Observable被取消订阅,更改线程状态为TERMINAL
    8. subscribeOn:指定scheduler

    原文地址

  • 相关阅读:
    node nmp 的关键信息
    PHP中定义常量的区别,define() 与 const
    mac电脑如何快速显示桌面及切换应用
    Mac拷贝/复制文件夹路径快捷键
    比 file_get_contents() 更优的 cURL 详解(附实例)
    PHP fopen/file_get_contents与curl性能比较
    在phpstorm中如何对比文件呢?
    PHP 基础篇
    MySQL 中视图和表的区别以及联系是什么?
    MAC将根目录文件夹的权限赋给用户
  • 原文地址:https://www.cnblogs.com/zhixiang-org-cn/p/11790390.html
Copyright © 2011-2022 走看看