zoukankan      html  css  js  c++  java
  • Hystrix核心熔断器

    在深入研究熔断器之前,我们需要先看一下Hystrix的几个重要的默认配置,这几个配置在HystrixCommandProperties

    //时间窗(ms)
    static final Integer default_metricsRollingStatisticalWindow = 10000;
    //最少请求次数
    private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;
    //熔断器打开后开始尝试半开的时间间隔
    private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;
    //错误比例
    private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;
    
    

    这几个属性共同组成了熔断器的核心逻辑,即:

    1. 每10秒的窗口期内,当请求次数超过20次,且出错比例超过50%,则触发熔断器打开
    2. 当熔断器5秒后,会尝试放过去一部分流量进行试探
    熔断器初始化

    熔断器的初始化是在HystrixCircuitBreaker.FactorygetInstance方法

            private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
    
            public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
                // this should find it for all but the first time
                HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
                if (previouslyCached != null) {
                    return previouslyCached;
                }
    
                // if we get here this is the first time so we need to initialize
    
                // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
                // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
                // If 2 threads hit here only one will get added and the other will get a non-null response instead.
                HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
                if (cbForCommand == null) {
                    // this means the putIfAbsent step just created a new one so let's retrieve and return it
                    return circuitBreakersByCommand.get(key.name());
                } else {
                    // this means a race occurred and while attempting to 'put' another one got there before
                    // and we instead retrieved it and will now return it
                    return cbForCommand;
                }
            }
    

    由上方代码可知,每一个熔断器都是由HystrixCircuitBreakerImpl实现的,而所有的熔断器都维护在circuitBreakersByCommand这个ConcurrentHashMap

    熔断器实现
    构造方法
    class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
    
        enum Status {
            CLOSED, OPEN, HALF_OPEN
        }
    
        private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
        private final AtomicLong circuitOpened = new AtomicLong(-1);
        private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
    
        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
    
            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
            Subscription s = subscribeToStream();
            activeSubscription.set(s);
        }
    }
    

    先介绍一下几个比较基础的属性:

    1. HystrixCommandProperties:当前熔断器的配置
    2. HystrixCommandMetrics: 请求统计组件
    3. Status:熔断器状态枚举,一共包含三种,关闭、打开和半开
    4. status:当前熔断器的状态
    5. circuitOpened:当前熔断器的打开时间
    6. activeSubscription:订阅请求统计的处理函数
    请求统计处理
    private Subscription subscribeToStream() {
                /*
                 * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
                 */
                return metrics.getHealthCountsStream()
                        .observe()
                        .subscribe(new Subscriber<HealthCounts>() {
                            @Override
                            public void onCompleted() {
    
                            }
    
                            @Override
                            public void onError(Throwable e) {
    
                            }
    
                            @Override
                            public void onNext(HealthCounts hc) {
                                // check if we are past the statisticalWindowVolumeThreshold
                                if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                    // we are not past the minimum volume threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                        //we are not past the minimum error threshold for the stat window,
                                        // so no change to circuit status.
                                        // if it was CLOSED, it stays CLOSED
                                        // if it was half-open, we need to wait for a successful command execution
                                        // if it was open, we need to wait for sleep window to elapse
                                    } else {
                                        // our failure rate is too high, we need to set the state to OPEN
                                        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                            circuitOpened.set(System.currentTimeMillis());
                                        }
                                    }
                                }
                            }
                        });
            }
    

    直接看onNext方法里的处理方式:

    1. 时间窗内的请求数量是否达标,按默认配置就是10秒钟的请求数是否超过20次,如果不达标不能开启熔断器
    2. else中首先判断错误比例是否达到比例,按默认就是50%
    3. 满足打开条件,使用CAS修改状态为打开,并记录打开时间circuitOpened为当前时间

    当记录了当前应用的统计数据之后,在每次请求的时候就可以根据这些数据来判断是否应该打开熔断器了

    请求过滤

    不知你是否还记得在系列文章第一篇中曾经提到了一个方法applyHystrixSemantics,在这个方法中就包含了判断是否应该熔断的逻辑,如果熔断器打开的情况下会直接进入降级逻辑。这个判断的方法如下:

            public boolean attemptExecution() {
                if (properties.circuitBreakerForceOpen().get()) {
                    return false;
                }
                if (properties.circuitBreakerForceClosed().get()) {
                    return true;
                }
                if (circuitOpened.get() == -1) {
                    return true;
                } else {
                    if (isAfterSleepWindow()) {
                        if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                            //only the first request after sleep window should execute
                            return true;
                        } else {
                            return false;
                        }
                    } else {
                        return false;
                    }
                }
            }
    
    1. 第一个if,如果配置强制熔断则返回false表示开启熔断器进入降级逻辑
    2. 第二个,如果配置强制关闭则返回正常不进行后续的判断
    3. 第三个,打开时间为空则肯定没打开过
    4. 第四个,判断是否满足尝试时间,默认是5秒钟。时间计算方式如下:
    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        return currentTime > circuitOpenTime + sleepWindowTime;
    }
    
    1. 当满足尝试时则使用CAS方式修改熔断器为半开状态

    而当请求成功的时候则会调用如下方法清除统计数据,更改熔断器状态为关闭

     public void markSuccess() {
                if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                    //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                    metrics.resetStream();
                    Subscription previousSubscription = activeSubscription.get();
                    if (previousSubscription != null) {
                        previousSubscription.unsubscribe();
                    }
                    Subscription newSubscription = subscribeToStream();
                    activeSubscription.set(newSubscription);
                    circuitOpened.set(-1L);
                }
            }
    

    请求失败则再次打开熔断器,并更新打开时间

            public void markNonSuccess() {
                if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                    //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                    circuitOpened.set(System.currentTimeMillis());
                }
            }
    

    原文地址

  • 相关阅读:
    hadoop文件写入
    elastic(10) 基本查询
    hadoop 小知识点
    "hadoop namenode -format"命令的作用和影响的文件
    elastic(9)映射
    Linux 学习笔记之 --- epoll 事件模型详解
    Linux 学习笔记之 --- select 与 poll 事件模型详解
    Tornado 高并发源码分析之二---Tornado启动和请求处理流程
    Tornado 高并发源码分析之五--- IOLoop 对象
    Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象
  • 原文地址:https://www.cnblogs.com/zhixiang-org-cn/p/11810037.html
Copyright © 2011-2022 走看看