zoukankan      html  css  js  c++  java
  • Hystrix缓存、熔断、隔离、降级

    一、引言

      上篇,我们已经讲到了进入HystrixCommand的queue方法,方法中最关键的就是toObservable方法,利用RxJava的事件发布订阅机制,实现hystrix的功能。

    二、缓存

      缓存,是HystrixCommand进入toObservable方法的第一个主流程,先读缓存,如果缓存命中则直接返回;否则继续执行,待执行结束后将结果放入缓存。

        protected final HystrixRequestCache requestCache;
        protected AbstractCommand(···) {
            ···
            //构建缓存单例
            this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        }
            //订阅时才生效的操作符
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                     // 设置命令状态
                    if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                        IllegalStateException ex = new IllegalStateException("This instance can only be executed once. 
                                        Please instantiate a new instance.");
                        //
                        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(),
                     getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                    }
                    //命令执行开始时间
                    commandStartTimestamp = System.currentTimeMillis();
                    //是否开启请求日志
                    if (properties.requestLogEnabled().get()) {
                        // 记录执行的命令
                        if (currentRequestLog != null) {
                            currentRequestLog.addExecutedCommand(_cmd);
                        }
                    }
                    //判断是否开启缓存 一般不会使用缓存 根据配置和getCacheKey来判断的
                    final boolean requestCacheEnabled = isRequestCachingEnabled();
                    //获取cacheKey HystrixCommand没有重写父类的这个方法,所以这里一直返回的是null
                    final String cacheKey = getCacheKey();
    
                    //缓存开启,先从缓存中获取
                    if (requestCacheEnabled) {
                        //从缓存获取HystrixCommandResponseFromCache
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            //返回Observable
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
                    //缓存中没有,或者没有开启
                    Observable<R> hystrixObservable =
                            //延迟生效的事件
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);
    
                    Observable<R> afterCache;
    
                    // 如果已经开启缓存 但是缓存未命中
                    if (requestCacheEnabled && cacheKey != null) {
                        // 封装命令HystrixCachedObservable
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        // 放入缓存中
                        HystrixCommandResponseFromCache<R> fromCache = 
                          (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        if (fromCache != null) {
                            toCache.unsubscribe();
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        } else {
                            afterCache = toCache.toObservable();
                        }
                    } else {
                        //没有开启缓存
                        afterCache = hystrixObservable;
                    }
    
                    return afterCache
                            //执行被终止时触发的监听,无论是正常终止还是异常终止
                            .doOnTerminate(terminateCommandCleanup)    
                            //订阅取消时触发的监听
                            .doOnUnsubscribe(unsubscribeCommandCleanup)
                            //执行完成时触发的监听
                            .doOnCompleted(fireOnCompletedHook);
                }
            });

    三、断路器

    配置

    circuitBreaker.enabled:是否允许使用断路器,默认true。
    circuitBreaker.forceOpen:是否强制开启断路器,默认false。
    circuitBreaker.forceClosed:是否强制关闭断路器,默认false。
    circuitBreaker.requestVolumeThreshold:滑动窗口时间内(默认metrics.rollingStats.timeInMilliseconds=10s),请求数量总数,达到n后才可以统计失败率进而执行熔断,默认20。
    circuitBreaker.errorThresholdPercentage:滑动窗口时间内(默认metrics.rollingStats.timeInMilliseconds=10s),错误率达到n%后,开启断路器,默认50。
    circuitBreaker.sleepWindowInMilliseconds:断路器开启后n毫秒内,拒绝请求,超过这个时间就可以尝试再次发起请求。

    断路器的实现也是在toObservable方法,第一步是缓存,第二步就是断路器,先看看他的初始化。

        protected final HystrixCircuitBreaker circuitBreaker;
        protected AbstractCommand(···) {
            //初始化断路器
            this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), 
                circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);        
        }
        
        private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                                HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                                HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // circuitBreaker.enabled = true
            if (enabled) { 
                if (fromConstructor == null) {
                    // 创建HystrixCircuitBreakerImpl
                    // 基于commandKey维度
                    return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
                } else {
                    return fromConstructor;
                }
            } else {
                // NoOpCircuitBreaker
                // allowRequest返回true
                // isOpen返回false
                return new NoOpCircuitBreaker();
            }
        }
        
        static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
            // 配置文件
            private final HystrixCommandProperties properties;
            // HystrixCommandMetrics负责HystrixCommand的指标统计管理
            private final HystrixCommandMetrics metrics;
            // 断路器开关
            private AtomicBoolean circuitOpen = new AtomicBoolean(false);
            // 首次打开时间 or 半开状态:上次尝试时间
            private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
            protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, 
                    HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
                this.properties = properties;
                this.metrics = metrics;
            }
        }

    toObservable中定义了一个事件,会在后面被订阅消费

            final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                //当前状态为取消订阅,不发送任何事件
    return Observable.never(); } //主要请求流程,包括熔断、降级、隔离等处理 return applyHystrixSemantics(_cmd); } };
        private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            //标记开始执行,如果这个钩子抛出异常,那么会发生快速失败,不会走回退方法
            executionHook.onStart(_cmd);
    
            //如果断路器允许请求,执行后续逻辑
            if (circuitBreaker.allowRequest()) {
                ···
            } else {
                // 否则抛出断路器打开的异常,执行降级逻辑
                return handleShortCircuitViaFallback();
            }
        }
            //HystrixCircuitBreakerImpl
            public boolean allowRequest() {
                // 如果circuitBreaker.forceOpen=true,返回不能执行
                if (properties.circuitBreakerForceOpen().get()) {
                    // 配置了强制打开断路器
                    return false;
                }
                // 如果circuitBreaker.forceClosed=true,返回可以执行
                if (properties.circuitBreakerForceClosed().get()) {
                    // 调用isOpen,根据统计信息,更新断路器的开闭状态
                    isOpen();
                    // 这里不管siOpen的结果,直接允许所有请求通过
                    return true;
                }
                // 没特殊配置的情况下
                // 断路器关闭 或 半开状态允许尝试发起一次请求
                return !isOpen() || allowSingleTest();
            }

    是否打开

            public boolean isOpen() {
                // 如果断路器开启,直接返回开启
                if (circuitOpen.get()) {
                    return true;
                }
                // 获取滑动窗口内请求数量包装类对象
                HealthCounts health = metrics.getHealthCounts();
    
                // 检查是否超过了统计窗口容量
                if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                    // 总请求数没有达到阈值,直接返回false,关闭状态
                    return false;
                }
                //错误率小于阈值返回关闭
                if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                    return false;
                } else {
                    // 错误率超出了阈值,尝试打开断路器
                    if (circuitOpen.compareAndSet(false, true)) {
                        // 这里设置当前时间为上一次断路器open的时间
                        circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                        // 返回断路器已开启
                        return true;
                    } else {
                        // 这里用到cas,如果失败说明有其他线程想把断路器打开,
                        // 那么,竞争失败的线程直接返回断路器已打开就行了
                        return true;
                    }
                }
            }

    是否允许尝试发送一次请求

            public boolean allowSingleTest() {
                // 获取断路器上次开启的时间
                long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
                // 1) 如果断路器是开启的
                // 2) 并且 当前时间 > 断路器上次开启的时间 + 配置circuitBreaker.sleepWindowInMilliseconds
                if (circuitOpen.get() && System.currentTimeMillis() > 
                  timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // cas尝试修改上次断路器开启的时间 为 当前时间 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { //尝试放行一个请求 return true; } } //走到这,说明 //1、还没过hystrix的保护窗口期,直接返回false //2、cas竞争失败,因为只允许一次请求通过,所以此处让竞争成功的线程去发送请求 return false; }

    断路器打开,执行降级逻辑

        private Observable<R> handleShortCircuitViaFallback() {
            // 记录事件  断路器打开 执行fallback逻辑
            eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
            // 封装一个断路器已打开的异常
            Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
            executionResult = executionResult.setExecutionException(shortCircuitException);
            try {
                //执行fallback逻辑
                return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                        "short-circuited", shortCircuitException);
            } catch (Exception e) {
                return Observable.error(e);
            }
        }

    四、资源隔离

    隔离策略

    execution.isolation.strategy:隔离策略,可选Thread、Semaphore,默认Thread。官方推荐,当有网络调用时,使用Thread;当纯内存操作或并发非常高导致创建线程资源消耗非常大时,使用Semaphore信号量。
    execution.timeout.enabled:HystrixCommand.run()是否有超时控制,默认true。
    execution.isolation.thread.timeoutInMilliseconds:超时时间,默认1s。不要被配置命名误导,目前无论是信号量隔离还是线程隔离策略,都可以通过这个属性设置超时时间。

    线程池配置

    coreSize:核心线程数,默认10。
    allowMaximumSizeToDivergeFromCoreSize:是否支持最大线程数配置,默认false,核心线程数等于最大线程数。
    maximumSize:最大线程数,默认10。
    maxQueueSize:线程池等待队列长度,默认-1,队列实现为SynchronousQueue。设置为大于0的数,队列实现为LinkedBlockingQueue。不支持动态更新这个配置,如果要更新这个配置需要重新初始化线程池。
    queueSizeRejectionThreshold:当maxQueueSize>0,因为maxQueueSize不支持动态更新,这个配置的目的是动态更新,来拒绝请求,默认5。
    keepAliveTimeMinutes:非核心线程空闲时间,默认1min。

    信号量配置

    execution.isolation.semaphore.maxConcurrentRequests:HystrixCommand.run同时执行的最大数量,默认10。

    1、信号量隔离

    toObservable方法的第三步就是资源隔离的实现,当然要断路器关闭或者允许一次请求通过的前提下circuitBreaker.allowRequest()为true,

            if (circuitBreaker.allowRequest()) {
                //获取信号量包装对象
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                // 信号量是否被释放的标记
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                // 发布一个用于释放信号量的事件
                final Action0 singleSemaphoreRelease = new Action0() {
                    @Override
                    public void call() {
                        //尝试修改标记的值
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                            // 如果修改成功,没有其他线程竞争或者和其他线程竞争成功 
                            // 信号量 -1
                            executionSemaphore.release();
                        }
                    }
                };
                // 异常标记事件
                final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                    @Override
                    public void call(Throwable t) {
                        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                    }
                };
                
                // 尝试获取信号量 
                // 策略是线程隔离的话 返回的对象是TryableSemaphoreNoOp 直接是true
                // 策略是信号量隔离的话 返回的对象是TryableSemaphoreActual 要进行逻辑判断
                if (executionSemaphore.tryAcquire()) {
                    try {
                        // 标记 开始 处理时间
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        // 执行后续流程
                        return executeCommandAndObserve(_cmd)
                                //发生错误时触发的事件
                                .doOnError(markExceptionThrown)
                                //执行终止时触发的事件
                                .doOnTerminate(singleSemaphoreRelease)
                                //取消订阅时触发的事件
                                .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                    }
                } else {
                    // 超出信号量阈值 执行fallback逻辑
                    return handleSemaphoreRejectionViaFallback();
                }
            }

    获取信号量对象

        // 信号量对象
        protected final TryableSemaphore executionSemaphoreOverride;
            protected AbstractCommand(···,
                //子类在创建的时候 传值都是null 所以属性默认值一般为null
                TryableSemaphore executionSemaphore) {
            ···
            this.executionSemaphoreOverride = executionSemaphore;
        }
        
        // 静态成员变量 executionSemaphorePerCircuit 
        // 保存 commandKey 和 信号量 的 映射关系
        protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit 
                        = new ConcurrentHashMap<String, TryableSemaphore>(); //获取信号量 protected TryableSemaphore getExecutionSemaphore() { //判读隔离策略是否为 信号量 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { // 如果策略是信号量 executionSemaphoreOverride在构造器中初始化的 一般为null if (executionSemaphoreOverride == null) { // 先从静态成员变量executionSemaphorePerCircuit获取信号量 TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // 如果没有 创建Hystrix自己的信号量实现类TryableSemaphoreActual 放入map中 // putIfAbsent 如果存在不会进行替换 这里防止并发创建放入 executionSemaphorePerCircuit.putIfAbsent(commandKey.name(),
                      new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // 从map中取出返回 TryableSemaphoreActual return executionSemaphorePerCircuit.get(commandKey.name()); } else { // 存在 直接返回 return _s; } } else { // 不为null 直接返回 return executionSemaphoreOverride; } } else { // 不是信号量 直接返回TryableSemaphoreNoOp return TryableSemaphoreNoOp.DEFAULT; } }

    尝试获取信号量

    信号量隔离实现

        static class TryableSemaphoreActual implements TryableSemaphore {
            
            protected final HystrixProperty<Integer> numberOfPermits;
            //信号量统计
            private final AtomicInteger count = new AtomicInteger(0);
    
            public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
                this.numberOfPermits = numberOfPermits;
            }
    
            @Override
            public boolean tryAcquire() {
                //信号量自增
                int currentCount = count.incrementAndGet();
                //判断当前信号量 是否大于 配置的信号量阈值
                if (currentCount > numberOfPermits.get()) {
                    // 大于 先自减 再返回false
                    count.decrementAndGet();
                    return false;
                } else {
                    // 小于 直接返回true
                    return true;
                }
            }
    
            @Override
            public void release() {
                // 信号量自减
                count.decrementAndGet();
            }
    
            @Override
            public int getNumberOfPermitsUsed() {
                // 获取当前信号量
                return count.get();
            }
    
        }

    线程池隔离实现

        //非信号量隔离 的默认实现
        static class TryableSemaphoreNoOp implements TryableSemaphore {
            public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
            @Override
            public boolean tryAcquire() {
                return true;
            }
            @Override
            public void release() {}
            @Override
            public int getNumberOfPermitsUsed() {
                return 0;
            }
        }

    2、线程池隔离

      线程池隔离是在判断完信号量隔离之后的executeCommandAndObserve方法中,这个方法也是发起任务调度(发送请求)的核心方法。

    初始化线程池

    protected final HystrixThreadPool threadPool;
    //构造方法
    protected AbstractCommand(···,
                HystrixThreadPoolKey threadPoolKey,
                HystrixThreadPool threadPool,
                HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
            // 获取HystrixThreadPoolKey,优先取传入的HystrixThreadPoolKey
            // 如果没传入取HystrixCommandGroupKey 如果跟feign结合使用的就是@feign的value属性
            this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup,
                       this.properties.executionIsolationThreadPoolKeyOverride().get()); // 构造时初始化threadPoolKey对应的HystrixThreadPool // 根据配置构造HystrixThreadPoolDefault this.threadPool = initThreadPool(threadPool, this.threadPoolKey,threadPoolPropertiesDefaults); } //初始化线程池 private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }

    执行后续请求

        private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            // 获取当前线程HystrixRequestContext ThreadLocal实现
            final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    
            // 定义一个onNext执行
            final Action1<R> markEmits = ···;
    
            // 定义一个完成事件
            final Action0 markOnCompleted = new Action0() {
                @Override
                public void call() {
                    if (!commandIsScalar()) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(),
                             (
    int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); // 执行成功结果记录 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); // 断路器标记成功 circuitBreaker.markSuccess(); } } }; // 降级处理 final Func1<Throwable, Observable<R>> handleFallback = ···; // 设置当前线程请求上下文 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; // 创建被观察者 Observable<R> execution; // 判断是否有超时配置 if (properties.executionTimeoutEnabled().get()) { // 如果execution.timeout.enabled=true // 执行后续逻辑 execution = executeCommandWithSpecifiedIsolation(_cmd) // HystrixObservableTimeoutOperator超时逻辑 .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { //没有超时配置 execution = executeCommandWithSpecifiedIsolation(_cmd); } // 订阅事件 return execution.doOnNext(markEmits)
              //完成时触发 .doOnCompleted(markOnCompleted) //发生错误时重新生成一个被观察者 触发onNext
              .onErrorResumeNext(handleFallback)
              // 每次发布新事件都会调用一次 .doOnEach(setRequestContext); }

    1)执行后续逻辑executeCommandWithSpecifiedIsolation(_cmd)

        private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
            // 判断隔离策略是否为 线程池
            if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
                // 延迟生效的事件   标记我们正在线程中执行
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                // 封装执行结果 executionResult
    = executionResult.setExecutionOccurred(); // cas修改命令状态 创建修改为执行 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(
                      new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // 标记开始 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 如果超时 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { //发布 异常事件 return Observable.error(new RuntimeException("timed out before executing run()")); } // cas修改线程状态 就绪改为开始执行 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { // 线程数自增 HystrixCounters.incrementGlobalConcurrentThreads();
                   //标记线程已执行 threadPool.markThreadExecution();
    // 存储正在运行的命令 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); // try {
                     //暂时是一些空实现 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd);
    // 执行HystrixCommand.run方法 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { // 发布 异常事件 return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } })//执行终止时触发,修改当前线程状态为终止
            .doOnTerminate(···
    )
            //取消订阅时触发,修改线程状态为 取消订阅
            .doOnUnsubscribe(
    ···)
            //订阅事件 使用线程池线程调度的地方
            .subscribeOn(
    // getScheduler获取hystrix自定义的Scheduler threadPool.getScheduler(new Func0<Boolean>() { // 判断执行超时后,是否中断执行线程 @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() &&
                                   _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } }) ); } else { // 不是线程池隔离策略 不需要发布线程相关的事件 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() {
                //设置执行结果 executionResult
    = executionResult.setExecutionOccurred();
                //修改执行状态
    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(
                       new IllegalStateException("execution attempted while in state : " + commandState.get().name())); }             //标记命令已执行 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // 存储正在运行的命令 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // 执行HystrixCommand.run方法 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }

    通过订阅事件将任务交给调度器Scheduler处理,threadPool.getScheduler(),这里的threadPool的实例是HystrixThreadPoolDefault

    static class HystrixThreadPoolDefault implements HystrixThreadPool {
        private final HystrixThreadPoolProperties properties;
        private final ThreadPoolExecutor threadPool;
        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            // 根据动态配置,更新ThreadPoolExecutor的配置
            touchConfig();
            // 创建HystrixContextScheduler
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }
    }

    根据动态配置,更新ThreadPoolExecutor的配置

        private void touchConfig() {
            // 核心线程数
            final int dynamicCoreSize = properties.coreSize().get();
            // 配置最大线程数
            final int configuredMaximumSize = properties.maximumSize().get();
            // 根据allowMaximumSizeToDivergeFromCoreSize和coreSize和maximumSize共同决定
            // 实际的最大线程数
            int dynamicMaximumSize = properties.actualMaximumSize();
            final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
            // 最大线程数
            boolean maxTooLow = false;
            // 支持最大线程数 且 最大线程数小于核心线程数
            if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
                dynamicMaximumSize = dynamicCoreSize;
                maxTooLow = true;
            }
            if (threadPool.getCorePoolSize() != dynamicCoreSize || 
              (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
                if (maxTooLow) {
                    logger.error(...);
                }
                // 设置ThreadPoolExecutor
                threadPool.setCorePoolSize(dynamicCoreSize);
                threadPool.setMaximumPoolSize(dynamicMaximumSize);
            }
            // 设置空闲线程存活时间
            threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
        }

    创建HystrixContextScheduler

        public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool,
                           Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); }

    2)HystrixContextScheduler执行Command

      RxJava有两个类rx.Scheduler和rx.Scheduler.Worker,这两个类都是抽象类,Scheduler负责创建Worker,Worker负责实际调度Action。

    createWorker操作ThreadPoolScheduler创建HystrixContextSchedulerWorker

    public class HystrixContextScheduler extends Scheduler {
        private final HystrixConcurrencyStrategy concurrencyStrategy;
        private final Scheduler actualScheduler;
        private final HystrixThreadPool threadPool;
        public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, 
                  HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.concurrencyStrategy = concurrencyStrategy;
            this.threadPool = threadPool;
            this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
        }
        @Override
        public Worker createWorker() {
            return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
        }
    }

    HystrixContextScheduler.HystrixContextSchedulerWorker.schedule(rx.functions.Action0)调度Action,实际操作WorkerHystrixContextScheduler.ThreadPoolWorker。

    private class HystrixContextSchedulerWorker extends Worker {
      private final Worker worker;
      private HystrixContextSchedulerWorker(Worker actualWorker) {
          this.worker = actualWorker;
      }
    
      @Override
      public Subscription schedule(Action0 action) {
          if (threadPool != null) {
                // 判断动态队列长度是否足够
              if (!threadPool.isQueueSpaceAvailable()) {
                  throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
              }
          }
          // HystrixContextScheduler.ThreadPoolWorker的schedule方法实际调度
          return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
      }
    }

    HystrixThreadPool.HystrixThreadPoolDefault.isQueueSpaceAvailable判断动态队列长度是否足够

    @Override
    public boolean isQueueSpaceAvailable() {
        if (queueSize <= 0) { // 取得是maxQueueSize配置,默认-1
            return true;
        } else {
            // ThreadPoolExecutor的队列长度小于queueSizeRejectionThreshold配置的动态队列长度限制
            return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
        }
    }

    ThreadPoolWorker.schedule是真正将任务提交到线程池执行的地方。

    private static class ThreadPoolWorker extends Worker {
        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;
        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }
    
        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }
            ScheduledAction sa = new ScheduledAction(action);
            subscription.add(sa);
            sa.addParent(subscription);
    
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 提交任务到线程池
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
            return sa;
        }
    }

    3)getUserExecutionObservable(_cmd)执行用户方法的地方,反射执行

        private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
            Observable<R> userObservable;
    
            try {
                //执行
                userObservable = getExecutionObservable();
            } catch (Throwable ex) {
                userObservable = Observable.error(ex);
            }
            return userObservable
                    .lift(new ExecutionHookApplication(_cmd))
                    .lift(new DeprecatedOnRunHookApplication(_cmd));
        }
        
        final protected Observable<R> getExecutionObservable() {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    try {
                        //run方法对应的就是创建的command的run方法
                        return Observable.just(run());
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    executionThread.set(Thread.currentThread());
                }
            });
        }
        //GenericCommand
        protected Object run() throws Exception {
            LOGGER.debug("execute command: {}", getCommandKey().name());
            return process(new Action() {
                @Override
                Object execute() {
                    //反射调用原始对象的方法  MethodExecutionAction
                    return getCommandAction().execute(getExecutionType());
                }
            });
        }

    4)超时检测逻辑lift(new HystrixObservableTimeoutOperator<R>(_cmd)),通过延迟定时任务+CAS来实现超时检测。

      private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
    
            final AbstractCommand<R> originalCommand;
    
            public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
                this.originalCommand = originalCommand;
            }
    
            @Override
            public Subscriber<? super R> call(final Subscriber<? super R> child) {
                final CompositeSubscription s = new CompositeSubscription();
                child.add(s);
    
                // HystrixContextRunnable让执行线程能获取到HystrixRequestContext
                final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy,
                 new Runnable() { // 触发onError @Override public void run() { child.onError(new HystrixTimeoutException()); } }); // TimerListener后续会提交到ScheduledThreadPoolExecutor中定时执行 TimerListener listener = new TimerListener() { @Override public void tick() { // 通过CAS来判断是否可以设置超时 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // 发布超时事件 originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // 停止请求 s.unsubscribe(); // 执行timeoutRunnable的run方法,触发HystrixTimeoutException异常 timeoutRunnable.run(); } } //获取超时时间 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; // 往定时器中添加监听器 final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // originalCommand.timeoutTimer.set(tl); //创建一个新的观察者 Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // 停止定时器 并发布成功事件 tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // 清除定时器 并发布失败事件 tl.clear(); child.onError(e); } } @Override public void onNext(R v) { // 判断超时 if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // 是否完成 或者 可以修改为完成状态 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // s.add(parent); return parent; } }

    HystrixTimer.addTimerListener提交超时检测定时任务,注意到Hystrix全局会开启一个检测超时的线程池,默认核心线程数为8,最大线程数为Integer.MAX_VALUE。

        //线程池执行器原子类
        AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();
        //添加 监听器
        public Reference<TimerListener> addTimerListener(final TimerListener listener) {
            // 初始化线程池ScheduledExecutor.initialize
            startThreadIfNeeded();
            
            Runnable r = new Runnable() {
    
                @Override
                public void run() {
                    try {
                        执行监听器的tick方法
                        listener.tick();
                    } catch (Exception e) {
                        logger.error("Failed while ticking TimerListener", e);
                    }
                }
            };
            // 延迟并间隔execution.isolation.thread.timeoutInMilliseconds时长执行,默认超时时间1秒
            ScheduledFuture<?> f = executor.get().getThreadPool()
                        .scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(),
                                    listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
            return new TimerReference(listener, f);
        }

    初始化线程池

        protected void startThreadIfNeeded() {
            // 判断是否已经存在
            while (executor.get() == null || ! executor.get().isInitialized()) {
                if (executor.compareAndSet(null, new ScheduledExecutor())) {
                    // 初始化
                    executor.get().initialize();
                }
            }
        }
        
        static class ScheduledExecutor {
            volatile ScheduledThreadPoolExecutor executor;
            private volatile boolean initialized;
            // 初始化线程池
            public void initialize() {
    
                HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
                // hystrix.timer.threadpool.default.coreSize 默认大小为8
                int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
                // 线程工厂,线程名HystrixTimer-*
                ThreadFactory threadFactory = null;
                if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
                    threadFactory = new ThreadFactory() {
                        final AtomicInteger counter = new AtomicInteger();
    
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
                            thread.setDaemon(true);
                            return thread;
                        }
    
                    };
                } else {
                    threadFactory = PlatformSpecific.getAppEngineThreadFactory();
                }
            // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
            // maxPoolSize无限大,非核心线程存活时间0
                executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
                initialized = true;
            }
    
            public ScheduledThreadPoolExecutor getThreadPool() {
                return executor;
            }
    
            public boolean isInitialized() {
                return initialized;
            }
        }

    五、降级

      会走降级逻辑的情况:

    1、断路器开启 handleShortCircuitViaFallback()

    2、信号量获取失败 handleSemaphoreRejectionViaFallback()

    3、线程队列满了 handleThreadPoolRejectionViaFallback()

    4、请求超时 handleTimeoutViaFallback()

    5、请求异常 handleFailureViaFallback()

    上面这些情况都会走到同一个逻辑getFallbackOrThrowException方法,只是在这之前各种情况会做一些其他的处理,忽略他们,直接看核心逻辑就好。

    private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType,
                final FailureType failureType, final String message, final Exception originalException) {
        
        ... 
        
        // fallback.enabled = true
        if (properties.fallbackEnabled().get()) {
            // 设置上下文
            final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                @Override
                public void call(Notification<? super R> rNotification) {
                    setRequestContextIfNeeded(requestContext);
                }
            };
            // onNext
            final Action1<R> markFallbackEmit = ...;
            // OnCompleted
            final Action0 markFallbackCompleted = ...;
            // 处理降级方法抛出的异常,封装一下重新抛出
            final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = originalException;
                    Exception fe = getExceptionFromThrowable(t);
                    if (fe instanceof UnsupportedOperationException) {
                        // 从子类找不到fallback时会抛出UnsupportedOperationException
                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(),
                                    getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));
                    } else {
                        // 其他异常
                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(),
                                    getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));
                    }
                }
            };
            // 对于fallback方法有信号量控制(fallback.isolation.semaphore.maxConcurrentRequests)
            // 默认大小为10
            final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            // 释放降级信号量
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        fallbackSemaphore.release();
                    }
                }
            };
            // 降级逻辑Observable
            Observable<R> fallbackExecutionChain;
            // 获取降级信号量
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    fallbackExecutionChain = getFallbackObservable();
                } catch (Throwable ex) {
                    fallbackExecutionChain = Observable.error(ex);
                }
                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               // 处理降级的信号量获取失败,封装为Observable.error(new HystrixRuntimeException(...));
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            // 处理禁用降级,封装为Observable.error(new HystrixRuntimeException(...));
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }

    AbstractCommand.getFallbackObservable获取降级逻辑

    @Override
    final protected Observable<R> getFallbackObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(getFallback());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        });
    }

    HystrixCommand.getFallback默认抛出UnsupportedOperationException,需要我们自己重写降级逻辑。

    protected R getFallback() {
        throw new UnsupportedOperationException("No fallback available.");
    }

    有一些异常是不会走fallback降级逻辑的

    1、HystrixBadRequestException,我们可以手动抛出这个异常,让方法不走降级逻辑

    throw new HystrixBadRequestException("此异常不走降级");

    原理

                    //executeCommandAndObserve方法,降级逻辑处理handleFallback
                    Exception e = getExceptionFromThrowable(t);
                    executionResult = executionResult.setExecutionException(e);
                    if (e instanceof RejectedExecutionException) {
                        return handleThreadPoolRejectionViaFallback(e);
                    } else if (t instanceof HystrixTimeoutException) {
                        return handleTimeoutViaFallback();
                        //HystrixBadRequestException异常不走fallback逻辑
                    } else if (t instanceof HystrixBadRequestException) {
                        return handleBadRequestByEmittingError(e);
                    } else {
                        //HystrixBadRequestException异常不走fallback逻辑
                        if (e instanceof HystrixBadRequestException) {
                            eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                            return Observable.error(e);
                        }
    
                        return handleFailureViaFallback(e);
                    }

    2、自定义异常,必须实现ExceptionNotWrappedByHystrix接口,然后抛出

    public class MyHystrixException extends RuntimeException implements ExceptionNotWrappedByHystrix {
    
        public MyHystrixException(String message){
            super(message);
        }
    }

    原理

            //getFallbackOrThrowException方法
            if (shouldNotBeWrapped(originalException)){
                Exception e = wrapWithOnErrorHook(failureType, originalException);
                return Observable.error(e);
            }
            protected boolean shouldNotBeWrapped(Throwable underlying) {
                return underlying instanceof ExceptionNotWrappedByHystrix;
            }

    3、isUnrecoverable判断异常的cause是不是不可恢复的。比如栈溢出、虚拟机异常、线程终止。

         //getFallbackOrThrowException方法
        private boolean isUnrecoverable(Throwable t) {
            if (t != null && t.getCause() != null) {
                Throwable cause = t.getCause();
                if (cause instanceof StackOverflowError) {
                    return true;
                } else if (cause instanceof VirtualMachineError) {
                    return true;
                } else if (cause instanceof ThreadDeath) {
                    return true;
                } else if (cause instanceof LinkageError) {
                    return true;
                }
            }
            return false;
        }

    参考连接:https://juejin.cn/column/6960847703521624094 

  • 相关阅读:
    2019-2020nowcoder牛客寒假基础2
    2019-2020nowcoder牛客寒假基础1
    CF1291
    Daily Codeforces
    2019ICPC 上海现场赛
    Codeforces Round #686 (Div. 3)
    Codeforces Round #685 (Div. 2)
    Educational Codeforces Round 98 (Rated for Div. 2)
    Codeforces Round #654 (Div. 2)
    Codeforces Round #683 (Div. 2, by Meet IT)
  • 原文地址:https://www.cnblogs.com/sglx/p/15784595.html
Copyright © 2011-2022 走看看