zoukankan      html  css  js  c++  java
  • hystrix(3) 熔断器

      讲完metrics我们就来了解一下熔断器的执行情况,熔断器的判断取决metrics数据。

      hystrix在执行命令前需要经过熔断器判断,如果服务被熔断,则执行fallback流程,熔断判断逻辑如下:

    1. 如果强制未开启,返回true(未熔断)。
    2. 如果强制开启,返回false(熔断)。
    3. 判断熔断标识
      1. 如果未熔断则返回true。
      2. 如果half_open,返回false(熔断)。
      3. 如果熔断,判断当前时间是否超过短路窗口期,
        1. 如果没有超过,返回false。
        2. 如果超过则返回true。 并设置熔断状态为half_open

      命令执行失败后逻辑如下:

        如果熔断标识为half_open,并重新计算短路窗口期(记录当前时间)。

        如果熔断标识为close,通过命令metric组件,获取命令指定窗口时间内执行总错误数和错误率。如果实际错误率或错误数高于配置错误率或错误数,则设置熔断标识为熔断。

      命令执行成功后逻辑如下:

        只有在熔断状态为half_open状态下,才能解除熔断。

        如果请求执行成功,解除熔断。

      熔断器还会监听metrics数据流,当错误比率或者请求量大于配置的值时,就会设置熔断标识为熔断。每个commandkey都会对应一个熔断器。

    熔断器判断

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
            ...
            if (circuitBreaker.allowRequest()) {
                ...
               return executeCommandAndObserve(_cmd)
                                .doOnError(markExceptionThrown)
                                .doOnTerminate(singleSemaphoreRelease)
                                .doOnUnsubscribe(singleSemaphoreRelease);
                ...
            } else {
                return handleShortCircuitViaFallback();
            }
        }

    熔断器逻辑

    static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
            private final HystrixCommandProperties properties;
            private final HystrixCommandMetrics metrics;
            /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
            private AtomicBoolean circuitOpen = new AtomicBoolean(false);
            /* when the circuit was marked open or was last allowed to try a 'singleTest' */
            private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
            protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
                this.properties = properties;
                this.metrics = metrics;
            }
            public void markSuccess() {
                if (circuitOpen.get()) {
                    if (circuitOpen.compareAndSet(true, false)) {
                        //win the thread race to reset metrics
                        //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                        //and all other metric consumers are unaffected by the reset
                        metrics.resetStream();
                    }
                }
            }
    
            @Override
            public boolean allowRequest() {
                if (properties.circuitBreakerForceOpen().get()) {
                    // properties have asked us to force the circuit open so we will allow NO requests
                    return false;
                }
                if (properties.circuitBreakerForceClosed().get()) {
                    // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
                    isOpen();
                    // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
                    return true;
                }
                return !isOpen() || allowSingleTest();
            }
    
            public boolean allowSingleTest() {
                long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
                // 1) if the circuit is open
                // 2) and it's been longer than 'sleepWindow' since we opened the circuit
                if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                    // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                    // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                    if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                        // if this returns true that means we set the time so we'll return true to allow the singleTest
                        // if it returned false it means another thread raced us and allowed the singleTest before we did
                        return true;
                    }
                }
                return false;
            }
    
            @Override
            public boolean isOpen() {
                if (circuitOpen.get()) {
                    // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
                    return true;
                }
                // we're closed, so let's see if errors have made us so we should trip the circuit open
                HealthCounts health = metrics.getHealthCounts();
                // check if we are past the statisticalWindowVolumeThreshold
                if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                    // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
                    return false;
                }
                if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                    return false;
                } else {
                    // our failure rate is too high, trip the circuit
                    if (circuitOpen.compareAndSet(false, true)) {
                        // if the previousValue was false then we want to set the currentTime
                        circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                        return true;
                    } else {
                        // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
                        // caused another thread to set it to true already even though we were in the process of doing the same
                        // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
                        return true;
                    }
                }
            }
    
        }
  • 相关阅读:
    定义一个Dog类,它和静态数据成员Dogs记录Dog的个体数目。静态成员函数GetDogs用来存取Dogs。设计并测试这个类--简单
    互联网无插件直播流媒体服务器方案EasyNVR下载新的软件执行程序,出现“invalid license”字样是什么意思?
    视频流媒体服务器RTSP拉流、RTMP推流方案EasyNVR如何实现视频转推其他直播间?
    视频流媒体服务器RTSP拉流、RTMP推流流媒体服务器授权方案之加密机运行后无法授权问题解决
    RTSP安防网络摄像头/海康大华硬盘录像机网页无插件直播之EasyNVR流媒体服务器系列产品直播延时问题解析
    海康大华网络摄像头RTSP_Onvif网页无插件直播流媒体服务器EasyNVR录像版设定录像文件存储位置的方法解析
    同一路摄像头视频流接入RTSP_Onvif网页无插件直播流媒体服务器EasyNVR与其他平台播放视频有差异的原因分析
    RTSP_Onvif安防摄像头直播流媒体服务器EasyNVR产品调用接口出现"Unauthorized"问题的解决方法
    安防摄像头RTSP/Onvif协议网页无插件直播视频流媒体服务器EasyNVR录像回看质量的影响因素有哪些?
    海康、大华等网络摄像头RTSP_Onvif网页无插件直播流媒体服务器EasyNVR在内网环境下,设备不在线问题处理
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/8250502.html
Copyright © 2011-2022 走看看