zoukankan      html  css  js  c++  java
  • 限流与熔断

    概述

    为了保证系统不被突发流量击垮,进行容量保障是十分有必要的。从架构的稳定性角度看,在有限资源的情况下,所能提供的单位时间服务能力也是有限的。假如超过承受能力,可能会带来整个服务的停顿,应用的Crash,进而可能将风险传递给服务调用方造成整个系统的服务能力丧失,进而引发雪崩。

    为了避免系统压力大时引发服务雪崩,就需要在系统中引入限流,降级和熔断等工具。

    目的

    最初在基础数据内部,我们使用了限流组件,然后调研了降级熔断框架Hystrix,发现这是一个很好的框架,能够提升依赖大量外部服务的系统的容错能力,但是不适合依赖比较简单的基础数据系统。

    机票交易系统依赖诸多子系统,比如保险,各种X产品等。故而,在此简单介绍一下,推广给交易和报价的同学使用。

    限流

    根据排队理论,具有延迟的服务随着请求量的不断提升,其平均响应时间也会迅速提升,为了保证服务的SLA,有必要控制单位时间的请求量。这就是限流为什么愈发重要的原因。

    分类

    qps限流

    限制每秒处理请求数不超过阈值。

    并发限流

    限制同时处理的请求数目。Java 中的 Semaphore 是做并发限制的好工具,特别适用于资源有效的场景。

    单机限流

    Guava 中的 RateLimiter。

    集群限流

    TC 提供的 common-blocking 组件提供此功能。

    算法

    漏桶算法

    漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    旗舰店运价库抓取调度

    在旗舰店运价库对航司报价的离线抓取中,由于航司接口有qps限制,需要限制访问速率。所以我们借助redis队列作为漏桶,实现了对航司接口访问速率的控制。

    令牌桶算法

    对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
    令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

    在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允许部分突发流量传输。在其源码里,可以看到能够突发传输的流量等于 maxBurstSeconds * qps。

    /**
     * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
     * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
     * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
     * seconds, we can save up to 2 * 10 = 20 permits.
     */
    static final class SmoothBursty extends SmoothRateLimiter {
      /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
      final double maxBurstSeconds;
     
      SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
        super(stopwatch);
        this.maxBurstSeconds = maxBurstSeconds;
      }

    Guava 里除了允许突发传输的 SmoothBursty, 还有于此相反的,可以缓慢启动的 SmoothWarmingUp。

    滑动窗口

    1. TC 提供的 common-blocking 限流组件,背后采用滑动窗口来计算流量是否超出限制。
    2. 熔断框架 Hystrix 也采用滑动窗口来统计服务调用信息。

    RxJava 实现滑动窗口计数

    RxJava 是一个响应式函数编程框架,以观察者模式为骨架,能够轻松实现滑动窗口计数功能。具体示例代码如下:

    package com.rxjava.test;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import rx.Observable;
    import rx.functions.Func1;
    import rx.functions.Func2;
    import rx.subjects.PublishSubject;
    import rx.subjects.SerializedSubject;
     
    import java.util.concurrent.TimeUnit;
     
    /**
     * 模拟滑动窗口计数
     * Created by albon on 17/6/24.
     */
    public class RollingWindowTest {
        private static final Logger logger = LoggerFactory.getLogger(WindowTest.class);
     
        public static final Func2<Integer, Integer, Integer> INTEGER_SUM =
                (integer, integer2) -> integer + integer2;
     
        public static final Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
                window -> window.scan(0, INTEGER_SUM).skip(3);
     
        public static final Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
                integerObservable -> integerObservable.reduce(0, INTEGER_SUM);
     
        public static void main(String[] args) throws InterruptedException {
            PublishSubject<Integer> publishSubject = PublishSubject.create();
            SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();
     
            serializedSubject
                    .window(5, TimeUnit.SECONDS) // 5秒作为一个基本块
                    .flatMap(INNER_BUCKET_SUM)           // 基本块内数据求和
                    .window(31)              // 3个块作为一个窗口,滚动布数为1
                    .flatMap(WINDOW_SUM)                 // 窗口数据求和
                    .subscribe((Integer integer) ->
                            logger.info("[{}] call ...... {}",
                            Thread.currentThread().getName(), integer));
     
            // 缓慢发送数据,观察效果
            for (int i=0; i<100; ++i) {
                if (i < 30) {
                    serializedSubject.onNext(1);
                else {
                    serializedSubject.onNext(2);
                }
                Thread.sleep(1000);
            }
        }
    }

    降级和熔断

    降级

    业务降级,是指牺牲非核心的业务功能,保证核心功能的稳定运行。简单来说,要实现优雅的业务降级,需要将功能实现拆分到相对独立的不同代码单元,分优先级进行隔离。在后台通过开关控制,降级部分非主流程的业务功能,减轻系统依赖和性能损耗,从而提升集群的整体吞吐率。

    降级的重点是:业务之间有优先级之分。

    降级的典型应用是:电商活动期间。

    熔断

    老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理我们的接口也需要安装上“保险丝”,以防止非预期的请求对系统压力过大而引起的系统瘫痪,当流量过大时,可以采取拒绝或者引流等机制。

    同样在分布式系统中,当被调用的远程服务无法使用时,如果没有过载保护,就会导致请求的资源阻塞在远程服务器上耗尽资源。很多时候,刚开始可能只是出现了局部小规模的故障,然而由于种种原因,故障影响范围越来越大,最终导致全局性的后果。这种过载保护,就是熔断器。

    熔断器的设计思路

    1. Closed:初始状态,熔断器关闭,正常提供服务
    2. Open: 失败次数,失败百分比达到一定的阈值之后,熔断器打开,停止访问服务
    3. Half-Open:熔断一定时间之后,小流量尝试调用服务,如果成功则恢复,熔断器变为Closed状态

    降级熔断相似点

    1. 目的一致,都是从可用性可靠性着想,为防止系统的整体缓慢甚至崩溃,采用的技术手段
    2. 最终表现类似,对于两者来说,最终让用户体验到的是某些功能暂时不可达或不可用
    3. 粒度一般都是服务级别
    4. 自治性要求很高,熔断模式一般都是服务基于策略的自动触发,降级虽说可人工干预,但在微服务架构下,完全靠人显然不可能,开关预置、配置中心都是必要手段

    降级熔断区别

    1. 触发原因不一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑
    2. 自愈能力要求不一样,服务熔断在发生后有自愈能力,而服务降级没有该职责

    相关框架组件

    common-blocking 限流组件

    http://wiki.corp.qunar.com/display/devwiki/common-blocking

    优点

    1. qconfig 控制,灵活方便
    2. 支持同步/异步模式,异步模式不增加接口响应时间
    3. 支持HTTP, Dubbo接口
    4. 集群级别的限流
    5. 支持自定义更细的限流粒度

    dubbo 熔断

    http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517

    hystrix 降级熔断

    适用对象

    依赖大量外部服务的业务

    主要功能

    1. 服务故障时,触发熔断,快速失败,而非排队处理,占用系统资源
    2. 支持线程池隔离,避免个别依赖服务耗光线程而影响其他服务
    3. 提供兜底策略,以尽量减少失败
    4. 通过限制线程池数目,或者信号量最大并发数,可以达到限流的目的
    5. 支持对结果进行Cache


    图中流程的说明:

    1. 将远程服务调用逻辑封装进一个HystrixCommand。
    2. 对于每次服务调用可以使用同步或异步机制,对应执行execute()或queue()。
    3. 判断熔断器(circuit-breaker)是否打开或者半打开状态,如果打开跳到步骤8,进行回退策略,如果关闭进入步骤4。
    4. 判断线程池/队列/信号量(使用了舱壁隔离模式)是否跑满,如果跑满进入回退步骤8,否则继续后续步骤5。
    5. run方法中执行了实际的服务调用。
      1. 服务调用发生超时时,进入步骤8。
    6. 判断run方法中的代码是否执行成功。
      1. 执行成功返回结果。
      2. 执行中出现错误则进入步骤8。
    7. 所有的运行状态(成功,失败,拒绝,超时)上报给熔断器,用于统计从而影响熔断器状态。
    8. 进入getFallback()回退逻辑。
      1. 没有实现getFallback()回退逻辑的调用将直接抛出异常。
      2. 回退逻辑调用成功直接返回。
      3. 回退逻辑调用失败抛出异常。
    9. 返回执行成功结果。

    命令模式

    hystrix 使用命令模式对服务调用进行封装,要想使用 hystrix,只需要继承 HystrixCommand 即可。

    public class CommandHelloFailure extends HystrixCommand<String> {
     
        public CommandHelloFailure() {
           super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("AvPool"))
                    .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                            .withCoreSize(2)//ThreadPoolExecutor maximumPoolSize=maxnumsize=10
                            .withMaxQueueSize(-1)//线程打满后,直接快速拒绝。
                    )
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                            .withExecutionTimeoutInMilliseconds(10000)//run方法执行时间,超过一秒就会被cancel.
                            .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                            .withCircuitBreakerRequestVolumeThreshold(30)
                            .withCircuitBreakerErrorThresholdPercentage(50)//错误率达到50%后,开启熔断.
                            .withCircuitBreakerSleepWindowInMilliseconds(5000)//熔断后休眠5000毫秒后发起重试.
                    ));        this.name = name;
        }
     
        @Override
        protected String run() {
            throw new RuntimeException("this command always fails");
        }
     
        @Override
        protected String getFallback() {
            return "Hello Failure " + name + "!";
        }
    }

    HystrixCommand有4个方法可供调用

    方法名

    返回值

    含义

    execute

    run方法的返回值

    同步阻塞执行

    queue

    Future

    异步非阻塞执行

    observe

    Observable<?>

    事件注册前执行

    toObservable

    Observable<?>

    事件注册后执行

    熔断判断逻辑关键源代码

    类名

    说明

    HystrixCircuitBreaker

    熔断判断类

    HealthCountsStream

    Command 执行结果统计类,主要提供了 HealthCounts 对象作为统计结果,用于 HystrixCircuitBreaker 中进行熔断判断

    BucketedRollingCounterStream 
    BucketedCounterStream

    滑动窗口计数类

    HystrixCommandCompletionStream

    Command 执行结果数据发布类

    无线同学封装的 qhystrix

    无线平台的同学,对hystrix进行了适当的封装,能够和公司的公共组件进行配合使用,十分方便。

    1. 引入了 QConfig 动态配置,方便线上灵活修改服务配置,更多配置相关资料可以查看官方文档 。

      scheduledMonitorOpen=true
      #av_pool线程池核心线程数目
      hystrix.threadpool.av_pool.coreSize=10
      #av_pool线程池队列大小
      hystrix.threadpool.av_pool.queueSizeRejectionThreshold=2
      #执行策略,以信号量/线程池的方式执行
      hystrix.command.av_query.execution.isolation.strategy=THREAD
      #超时是否启用
      hystrix.command.av_query.execution.timeout.enabled=true
      #当超时的时候是否中断(interrupt) HystrixCommand.run()执行
      hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout=true
      #超时时间,支持线程池/信号量
      hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds=10000
      #统计执行数据的时间窗口,单位毫秒【此配置修改后需要重启才能生效】
      hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds=10000
      #触发熔断器开启的窗口时间需要达到的最小请求数
      hystrix.command.av_query.circuitBreaker.requestVolumeThreshold=10
      #熔断器保持开启状态时间。默认为5000毫秒,则熔断器中断请求5秒后会进入半打开状态,放部分流量过去重试
      hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds=10000
      #窗口时间内的出错率,默认为50,则达到50%出错率时,熔断开启
      hystrix.command.av_query.circuitBreaker.errorThresholdPercentage=50
    2. 引入了 Watcher 监控,方便观察服务运行情况,以及添加报警。

      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count=6
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time=0
      hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count=64
      JVM_Thread_Count=147
      hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time=0
      hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count=25
      JVM_PS_MarkSweep_Count=0
      Fare_Rule_Change_Task_Queue_Size_Value=0
      hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time=0
      hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count=39
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count=25
      hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count=6
      hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time=0
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time=0
      Av_Change_Task_Queue_Size_Value=0
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count=64
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time=0
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time=0
      no_y_email_task_Time=0
      no_y_queue_empty_Count=0
      hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time=0
      JVM_PS_Scavenge_Count=0
      no_y_email_task_Count=0
      hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count=39
      no_y_queue_empty_Time=0

    注解驱动开发

    注解是 Java 语言的一大优势,很多开发框架比如 Spring, Hibernate 都将这一优势使用到了极致。所以呢,官方 也提供了基于注解进行开发的 hystrix-javanica 包,能够减少大量繁琐代码的编写。

    <dependency>
        <groupId>com.netflix.hystrix</groupId>
        <artifactId>hystrix-javanica</artifactId>
        <version>1.5.11</version>
    </dependency>
    <aop:aspectj-autoproxy/>
    <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"></bean>

    在POM引入jar包,并且配置spring aop即可。示例代码如下:

    package com.qunar.flight.farecore.storage.service.av;
     
    import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
    import com.netflix.hystrix.strategy.HystrixPlugins;
    import com.qunar.mobile.hystrix.config.HystrixConfig;
    import com.qunar.mobile.hystrix.config.QconfigUtils;
    import com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor;
    import com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    import qunar.tc.qconfig.client.Configuration;
     
    import javax.annotation.PostConstruct;
    import java.util.Map;
     
    /**
     * @author albon
     *         Date: 17-6-30
     *         Time: 上午10:44
     */
    @Service
    public class TestHystrixService {
        private static final Logger logger = LoggerFactory.getLogger(TestHystrixService.class);
     
        @PostConstruct
        public void init() {
            logger.info("init execution hook");
     
            HystrixPlugins.getInstance().registerCommandExecutionHook(new QunarHystrixCommandExecutionHook());
            QconfigUtils.getInstance().getHystrixMapConfig()
                    .addListener(new Configuration.ConfigListener<Map<String, String>>() {
                        @Override
                        public void onLoad(Map<String, String> conf) {
                            boolean isOpen = Boolean.parseBoolean(conf.get("scheduledMonitorOpen"));
                            HystrixScheduledMonitor.getInstance().setOpen(isOpen);
     
                            // qconfig热发修改配置
                            HystrixConfig.getInstance().setConfig(conf);
                        }
                    });
        }
     
        @HystrixCommand(commandKey = "av_query", groupKey = "av_group", threadPoolKey = "av_pool", fallbackMethod = "queryFallback")
        public String query() {
            return String.format("[%s] RUN SUCCESS", Thread.currentThread().getName());
        }
     
        public String queryFallback() {
            return String.format("[%s] FALLBACK", Thread.currentThread().getName());
        }
     
    }

    注意:因为注解方式不再继承quanr-hystrix里的AbstractHystrixCommand,故这里特意在 PostContruct 方法里,注册了 Hook 类,以及对 QConfig 配置进行监听。

    在使用时,曾经遇到一个小问题,异常信息如下:

    Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid byte tag in constant pool: 18
        at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java:192) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java:131) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java:262) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:242) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:249) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java:202) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java:211) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java:163) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:109) ~[aspectjweaver-1.6.10.jar:1.6.10]
        at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:96) ~[aspectjweaver-1.6.10.jar:1.6.10]

    通过升级aspectjweaver包到最新版解决

    <dependency>
        <groupId>org.aspectj</groupId>
        <artifactId>aspectjweaver</artifactId>
        <version>1.8.10</version>
    </dependency>

    更详细的资料可以查看官方文档

    资料

    qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
    官方文档https://github.com/Netflix/Hystrix/wiki
    javanica文档 https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

    使用中遇到的坑

    commons-configuration 包版本低引起的 NPE 异常

    报价同学在其系统中引入时,遇到了很奇怪的 NPE 异常

    java.lang.NullPointerException
            at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java:330)
            at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java:76)
            at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java:63)
            at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java:68)
            at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java:172)
            at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java:125)
            at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java:263)
            at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java:245)
            at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java:58)
            at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:62)
            at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:73)
            at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java:34)
            at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java:344)
            at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java:334)
            at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java:243)
            at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java:62)
            at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java:204)
            at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java:163)
            at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:147)
            at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:133)

    ConcurrentMapConfiguration.clearConfigurationListeners 方法内容如下:

    public class ConcurrentMapConfiguration extends AbstractConfiguration {
        protected ConcurrentHashMap<String,Object> map;
        private Collection<ConfigurationListener> listeners = new CopyOnWriteArrayList<ConfigurationListener>(); 
     
        @Override
        public void clearConfigurationListeners() {
            listeners.clear(); // NPE异常就在这一行
        }
    }

    初看代码,listeners 变量是有初始化语句的,为啥还会是 null 呢?我们继续往上看 ConcurrentMapConfiguration 的父类,clearConfigurationListeners 方法覆盖的是父类 EventSource 的同名方法。EventSource 代码如下:

    public class EventSource {
        /** A collection for the registered event listeners. */
        private Collection listeners;
     
        /** A counter for the detail events. */
        private int detailEvents;
     
        /**
         * Creates a new instance of <code>EventSource</code>.
         */
        public EventSource() {
            clearConfigurationListeners();
        }
     
        /**
         * Removes all registered configuration listeners.
         */
        public void clearConfigurationListeners() {
            listeners = new LinkedList();
        }

     clearConfigurationListeners 方法是在父类的构造函数中调用的,此时子类还没有做初始化操作,listeners 也还没有赋值,这就奇葩了。仔细观察后发现,EventSource 是 commons-configuration 包里的,而 ConcurrentMapConfiguration 是 archaius-core 包(Netflix 开源的基于java的配置管理类库)的,archaius 中引用的 commons-configuration 是较高的版本 1.8。高版本的 EventSource 代码有所不同,其构造函数里没有再调用 clearConfigurationListeners,而是调用 initListeners 方法。升级 pom 中的 commons-configuration 版本后,系统运行正常啦

    public class EventSource {
        /** A collection for the registered event listeners. */
        private Collection<ConfigurationListener> listeners;
     
        /**
         * Creates a new instance of {@code EventSource}.
         */
        public EventSource() {
            initListeners();
        }
     
        /**
         * Initializes the collections for storing registered event listeners.
         */
        private void initListeners() {
            listeners = new CopyOnWriteArrayList<ConfigurationListener>();
            errorListeners = new CopyOnWriteArrayList<ConfigurationErrorListener>();
        }

    dynamic-limiter 组件

    设计目的

    使用 Hystrix 编程必须使用其命令模式,继承 HystrixCommand 或 HystrixObservableCommand。对于程序中原先采用异步回调模式的代码,使用 Hystrix 必须改造成同步模式,涉及到系统代码的大量改造,并且隔离在单独的线程池中,导致线程切换增加,影响性能。

    为此,我们在深入了解 Hystrix 原理之后,决定自己动手写一个降级熔断工具 dynamic-limiter,提供以下两种功能:

    1.  简化给异步回调模式的代码增加降级熔断的功能的步骤。主要做法是,不再采用 Hystrix 僵硬的命令模式,分离服务调用结果的监控和熔断判断逻辑。
    2.  针对接收消息->获取资源→进入计算队列的编程模式,设计动态限流组件,逻辑如下图所示:

        3. 监控系统负载,动态设置限流阈值,原理如下:

  • 相关阅读:
    杨辉三角实现
    三种方式都能生成同样的列表
    Python 直接赋值、浅拷贝和深度拷贝解析
    Python 拷贝对象(深拷贝deepcopy与浅拷贝copy)
    教你玩转CSS 分组选择器和嵌套选择器
    教你玩转CSS padding(填充)
    教你玩转CSS 轮廓(outline)属性
    教你玩转CSS margin(外边距)
    教你玩转CSS border(边框)
    教你玩转CSS表格(table)
  • 原文地址:https://www.cnblogs.com/DengGao/p/rateLimit.html
Copyright © 2011-2022 走看看