概述
为了保证系统不被突发流量击垮,进行容量保障是十分有必要的。从架构的稳定性角度看,在有限资源的情况下,所能提供的单位时间服务能力也是有限的。假如超过承受能力,可能会带来整个服务的停顿,应用的Crash,进而可能将风险传递给服务调用方造成整个系统的服务能力丧失,进而引发雪崩。
为了避免系统压力大时引发服务雪崩,就需要在系统中引入限流,降级和熔断等工具。
目的
最初在基础数据内部,我们使用了限流组件,然后调研了降级熔断框架Hystrix,发现这是一个很好的框架,能够提升依赖大量外部服务的系统的容错能力,但是不适合依赖比较简单的基础数据系统。
机票交易系统依赖诸多子系统,比如保险,各种X产品等。故而,在此简单介绍一下,推广给交易和报价的同学使用。
限流
根据排队理论,具有延迟的服务随着请求量的不断提升,其平均响应时间也会迅速提升,为了保证服务的SLA,有必要控制单位时间的请求量。这就是限流为什么愈发重要的原因。
分类
qps限流
限制每秒处理请求数不超过阈值。
并发限流
限制同时处理的请求数目。Java 中的 Semaphore 是做并发限制的好工具,特别适用于资源有效的场景。
单机限流
Guava 中的 RateLimiter。
集群限流
TC 提供的 common-blocking 组件提供此功能。
算法
漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
旗舰店运价库抓取调度
在旗舰店运价库对航司报价的离线抓取中,由于航司接口有qps限制,需要限制访问速率。所以我们借助redis队列作为漏桶,实现了对航司接口访问速率的控制。
令牌桶算法
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允许部分突发流量传输。在其源码里,可以看到能够突发传输的流量等于 maxBurstSeconds * qps。
/** * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling. * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10 * seconds, we can save up to 2 * 10 = 20 permits. */ static final class SmoothBursty extends SmoothRateLimiter { /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super (stopwatch); this .maxBurstSeconds = maxBurstSeconds; } |
Guava 里除了允许突发传输的 SmoothBursty, 还有于此相反的,可以缓慢启动的 SmoothWarmingUp。
滑动窗口
- TC 提供的 common-blocking 限流组件,背后采用滑动窗口来计算流量是否超出限制。
- 熔断框架 Hystrix 也采用滑动窗口来统计服务调用信息。
RxJava 实现滑动窗口计数
RxJava 是一个响应式函数编程框架,以观察者模式为骨架,能够轻松实现滑动窗口计数功能。具体示例代码如下:
package com.rxjava.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.functions.Func1; import rx.functions.Func2; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import java.util.concurrent.TimeUnit; /** * 模拟滑动窗口计数 * Created by albon on 17/6/24. */ public class RollingWindowTest { private static final Logger logger = LoggerFactory.getLogger(WindowTest. class ); public static final Func2<Integer, Integer, Integer> INTEGER_SUM = (integer, integer2) -> integer + integer2; public static final Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM = window -> window.scan( 0 , INTEGER_SUM).skip( 3 ); public static final Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM = integerObservable -> integerObservable.reduce( 0 , INTEGER_SUM); public static void main(String[] args) throws InterruptedException { PublishSubject<Integer> publishSubject = PublishSubject.create(); SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized(); serializedSubject .window( 5 , TimeUnit.SECONDS) // 5秒作为一个基本块 .flatMap(INNER_BUCKET_SUM) // 基本块内数据求和 .window( 3 , 1 ) // 3个块作为一个窗口,滚动布数为1 .flatMap(WINDOW_SUM) // 窗口数据求和 .subscribe((Integer integer) -> logger.info( "[{}] call ...... {}" , Thread.currentThread().getName(), integer)); // 缓慢发送数据,观察效果 for ( int i= 0 ; i< 100 ; ++i) { if (i < 30 ) { serializedSubject.onNext( 1 ); } else { serializedSubject.onNext( 2 ); } Thread.sleep( 1000 ); } } } |
降级和熔断
降级
业务降级,是指牺牲非核心的业务功能,保证核心功能的稳定运行。简单来说,要实现优雅的业务降级,需要将功能实现拆分到相对独立的不同代码单元,分优先级进行隔离。在后台通过开关控制,降级部分非主流程的业务功能,减轻系统依赖和性能损耗,从而提升集群的整体吞吐率。
降级的重点是:业务之间有优先级之分。
降级的典型应用是:电商活动期间。
熔断
老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理我们的接口也需要安装上“保险丝”,以防止非预期的请求对系统压力过大而引起的系统瘫痪,当流量过大时,可以采取拒绝或者引流等机制。
同样在分布式系统中,当被调用的远程服务无法使用时,如果没有过载保护,就会导致请求的资源阻塞在远程服务器上耗尽资源。很多时候,刚开始可能只是出现了局部小规模的故障,然而由于种种原因,故障影响范围越来越大,最终导致全局性的后果。这种过载保护,就是熔断器。
熔断器的设计思路
- Closed:初始状态,熔断器关闭,正常提供服务
- Open: 失败次数,失败百分比达到一定的阈值之后,熔断器打开,停止访问服务
- Half-Open:熔断一定时间之后,小流量尝试调用服务,如果成功则恢复,熔断器变为Closed状态
降级熔断相似点
- 目的一致,都是从可用性可靠性着想,为防止系统的整体缓慢甚至崩溃,采用的技术手段
- 最终表现类似,对于两者来说,最终让用户体验到的是某些功能暂时不可达或不可用
- 粒度一般都是服务级别
- 自治性要求很高,熔断模式一般都是服务基于策略的自动触发,降级虽说可人工干预,但在微服务架构下,完全靠人显然不可能,开关预置、配置中心都是必要手段
降级熔断区别
- 触发原因不一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑
- 自愈能力要求不一样,服务熔断在发生后有自愈能力,而服务降级没有该职责
相关框架组件
common-blocking 限流组件
http://wiki.corp.qunar.com/display/devwiki/common-blocking
优点
- qconfig 控制,灵活方便
- 支持同步/异步模式,异步模式不增加接口响应时间
- 支持HTTP, Dubbo接口
- 集群级别的限流
- 支持自定义更细的限流粒度
dubbo 熔断
http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517
hystrix 降级熔断
适用对象
依赖大量外部服务的业务
主要功能
- 服务故障时,触发熔断,快速失败,而非排队处理,占用系统资源
- 支持线程池隔离,避免个别依赖服务耗光线程而影响其他服务
- 提供兜底策略,以尽量减少失败
- 通过限制线程池数目,或者信号量最大并发数,可以达到限流的目的
- 支持对结果进行Cache
- 将远程服务调用逻辑封装进一个HystrixCommand。
- 对于每次服务调用可以使用同步或异步机制,对应执行execute()或queue()。
- 判断熔断器(circuit-breaker)是否打开或者半打开状态,如果打开跳到步骤8,进行回退策略,如果关闭进入步骤4。
- 判断线程池/队列/信号量(使用了舱壁隔离模式)是否跑满,如果跑满进入回退步骤8,否则继续后续步骤5。
- run方法中执行了实际的服务调用。
- 服务调用发生超时时,进入步骤8。
- 判断run方法中的代码是否执行成功。
- 执行成功返回结果。
- 执行中出现错误则进入步骤8。
- 所有的运行状态(成功,失败,拒绝,超时)上报给熔断器,用于统计从而影响熔断器状态。
- 进入getFallback()回退逻辑。
- 没有实现getFallback()回退逻辑的调用将直接抛出异常。
- 回退逻辑调用成功直接返回。
- 回退逻辑调用失败抛出异常。
- 返回执行成功结果。
命令模式
hystrix 使用命令模式对服务调用进行封装,要想使用 hystrix,只需要继承 HystrixCommand 即可。
public class CommandHelloFailure extends HystrixCommand<String> { public CommandHelloFailure() { super (HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY)) .andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY)) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey( "AvPool" )) .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter() .withCoreSize( 2 ) //ThreadPoolExecutor maximumPoolSize=maxnumsize=10 .withMaxQueueSize(- 1 ) //线程打满后,直接快速拒绝。 ) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds( 10000 ) //run方法执行时间,超过一秒就会被cancel. .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) .withCircuitBreakerRequestVolumeThreshold( 30 ) .withCircuitBreakerErrorThresholdPercentage( 50 ) //错误率达到50%后,开启熔断. .withCircuitBreakerSleepWindowInMilliseconds( 5000 ) //熔断后休眠5000毫秒后发起重试. )); this .name = name; } @Override protected String run() { throw new RuntimeException( "this command always fails" ); } @Override protected String getFallback() { return "Hello Failure " + name + "!" ; } } |
HystrixCommand有4个方法可供调用
方法名 |
返回值 |
含义 |
---|---|---|
execute |
run方法的返回值 |
同步阻塞执行 |
queue |
Future |
异步非阻塞执行 |
observe |
Observable<?> |
事件注册前执行 |
toObservable |
Observable<?> |
事件注册后执行 |
熔断判断逻辑关键源代码
类名 |
说明 |
---|---|
HystrixCircuitBreaker |
熔断判断类 |
HealthCountsStream |
Command 执行结果统计类,主要提供了 HealthCounts 对象作为统计结果,用于 HystrixCircuitBreaker 中进行熔断判断 |
BucketedRollingCounterStream |
滑动窗口计数类 |
HystrixCommandCompletionStream |
Command 执行结果数据发布类 |
无线同学封装的 qhystrix
无线平台的同学,对hystrix进行了适当的封装,能够和公司的公共组件进行配合使用,十分方便。
-
引入了 QConfig 动态配置,方便线上灵活修改服务配置,更多配置相关资料可以查看官方文档 。
scheduledMonitorOpen=
true
#av_pool线程池核心线程数目
hystrix.threadpool.av_pool.coreSize=
10
#av_pool线程池队列大小
hystrix.threadpool.av_pool.queueSizeRejectionThreshold=
2
#执行策略,以信号量/线程池的方式执行
hystrix.command.av_query.execution.isolation.strategy=THREAD
#超时是否启用
hystrix.command.av_query.execution.timeout.enabled=
true
#当超时的时候是否中断(interrupt) HystrixCommand.run()执行
hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout=
true
#超时时间,支持线程池/信号量
hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds=
10000
#统计执行数据的时间窗口,单位毫秒【此配置修改后需要重启才能生效】
hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds=
10000
#触发熔断器开启的窗口时间需要达到的最小请求数
hystrix.command.av_query.circuitBreaker.requestVolumeThreshold=
10
#熔断器保持开启状态时间。默认为
5000
毫秒,则熔断器中断请求
5
秒后会进入半打开状态,放部分流量过去重试
hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds=
10000
#窗口时间内的出错率,默认为
50
,则达到
50
%出错率时,熔断开启
hystrix.command.av_query.circuitBreaker.errorThresholdPercentage=
50
-
引入了 Watcher 监控,方便观察服务运行情况,以及添加报警。
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count=
6
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time=
0
hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count=
64
JVM_Thread_Count=
147
hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time=
0
hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count=
25
JVM_PS_MarkSweep_Count=
0
Fare_Rule_Change_Task_Queue_Size_Value=
0
hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time=
0
hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count=
39
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count=
25
hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count=
6
hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time=
0
Av_Change_Task_Queue_Size_Value=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count=
64
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time=
0
no_y_email_task_Time=
0
no_y_queue_empty_Count=
0
hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time=
0
JVM_PS_Scavenge_Count=
0
no_y_email_task_Count=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count=
39
no_y_queue_empty_Time=
0
注解驱动开发
注解是 Java 语言的一大优势,很多开发框架比如 Spring, Hibernate 都将这一优势使用到了极致。所以呢,官方 也提供了基于注解进行开发的 hystrix-javanica 包,能够减少大量繁琐代码的编写。
<dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-javanica</artifactId> <version> 1.5 . 11 </version> </dependency> |
<aop:aspectj-autoproxy/> <bean id= "hystrixAspect" class = "com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" ></bean> |
在POM引入jar包,并且配置spring aop即可。示例代码如下:
package com.qunar.flight.farecore.storage.service.av; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.strategy.HystrixPlugins; import com.qunar.mobile.hystrix.config.HystrixConfig; import com.qunar.mobile.hystrix.config.QconfigUtils; import com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor; import com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import qunar.tc.qconfig.client.Configuration; import javax.annotation.PostConstruct; import java.util.Map; /** * @author albon * Date: 17-6-30 * Time: 上午10:44 */ @Service public class TestHystrixService { private static final Logger logger = LoggerFactory.getLogger(TestHystrixService. class ); @PostConstruct public void init() { logger.info( "init execution hook" ); HystrixPlugins.getInstance().registerCommandExecutionHook( new QunarHystrixCommandExecutionHook()); QconfigUtils.getInstance().getHystrixMapConfig() .addListener( new Configuration.ConfigListener<Map<String, String>>() { @Override public void onLoad(Map<String, String> conf) { boolean isOpen = Boolean.parseBoolean(conf.get( "scheduledMonitorOpen" )); HystrixScheduledMonitor.getInstance().setOpen(isOpen); // qconfig热发修改配置 HystrixConfig.getInstance().setConfig(conf); } }); } @HystrixCommand (commandKey = "av_query" , groupKey = "av_group" , threadPoolKey = "av_pool" , fallbackMethod = "queryFallback" ) public String query() { return String.format( "[%s] RUN SUCCESS" , Thread.currentThread().getName()); } public String queryFallback() { return String.format( "[%s] FALLBACK" , Thread.currentThread().getName()); } } |
注意:因为注解方式不再继承quanr-hystrix里的AbstractHystrixCommand,故这里特意在 PostContruct 方法里,注册了 Hook 类,以及对 QConfig 配置进行监听。
在使用时,曾经遇到一个小问题,异常信息如下:
Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid byte tag in constant pool: 18 at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java: 192 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java: 131 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java: 262 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 242 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 249 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java: 202 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java: 211 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java: 163 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 109 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 96 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ] |
通过升级aspectjweaver包到最新版解决
<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version> 1.8 . 10 </version> </dependency> |
更详细的资料可以查看官方文档。
资料
qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
官方文档https://github.com/Netflix/Hystrix/wiki
javanica文档 https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica
使用中遇到的坑
commons-configuration 包版本低引起的 NPE 异常
报价同学在其系统中引入时,遇到了很奇怪的 NPE 异常
java.lang.NullPointerException at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java: 330 ) at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java: 76 ) at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java: 63 ) at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java: 68 ) at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java: 172 ) at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java: 125 ) at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java: 263 ) at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java: 245 ) at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java: 58 ) at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 62 ) at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 73 ) at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java: 34 ) at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java: 344 ) at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java: 334 ) at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java: 243 ) at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java: 62 ) at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java: 204 ) at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java: 163 ) at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 147 ) at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 133 ) |
ConcurrentMapConfiguration.clearConfigurationListeners 方法内容如下:
public class ConcurrentMapConfiguration extends AbstractConfiguration { protected ConcurrentHashMap<String,Object> map; private Collection<ConfigurationListener> listeners = new CopyOnWriteArrayList<ConfigurationListener>(); @Override public void clearConfigurationListeners() { listeners.clear(); // NPE异常就在这一行 } } |
初看代码,listeners 变量是有初始化语句的,为啥还会是 null 呢?我们继续往上看 ConcurrentMapConfiguration 的父类,clearConfigurationListeners 方法覆盖的是父类 EventSource 的同名方法。EventSource 代码如下:
public class EventSource { /** A collection for the registered event listeners. */ private Collection listeners; /** A counter for the detail events. */ private int detailEvents; /** * Creates a new instance of <code>EventSource</code>. */ public EventSource() { clearConfigurationListeners(); } /** * Removes all registered configuration listeners. */ public void clearConfigurationListeners() { listeners = new LinkedList(); } |
clearConfigurationListeners 方法是在父类的构造函数中调用的,此时子类还没有做初始化操作,listeners 也还没有赋值,这就奇葩了。仔细观察后发现,EventSource 是 commons-configuration 包里的,而 ConcurrentMapConfiguration 是 archaius-core 包(Netflix 开源的基于java的配置管理类库)的,archaius 中引用的 commons-configuration 是较高的版本 1.8。高版本的 EventSource 代码有所不同,其构造函数里没有再调用 clearConfigurationListeners,而是调用 initListeners 方法。升级 pom 中的 commons-configuration 版本后,系统运行正常啦。
public class EventSource { /** A collection for the registered event listeners. */ private Collection<ConfigurationListener> listeners; /** * Creates a new instance of {@code EventSource}. */ public EventSource() { initListeners(); } /** * Initializes the collections for storing registered event listeners. */ private void initListeners() { listeners = new CopyOnWriteArrayList<ConfigurationListener>(); errorListeners = new CopyOnWriteArrayList<ConfigurationErrorListener>(); } |
dynamic-limiter 组件
设计目的
使用 Hystrix 编程必须使用其命令模式,继承 HystrixCommand 或 HystrixObservableCommand。对于程序中原先采用异步回调模式的代码,使用 Hystrix 必须改造成同步模式,涉及到系统代码的大量改造,并且隔离在单独的线程池中,导致线程切换增加,影响性能。
为此,我们在深入了解 Hystrix 原理之后,决定自己动手写一个降级熔断工具 dynamic-limiter,提供以下两种功能:
- 简化给异步回调模式的代码增加降级熔断的功能的步骤。主要做法是,不再采用 Hystrix 僵硬的命令模式,分离服务调用结果的监控和熔断判断逻辑。
- 针对接收消息->获取资源→进入计算队列的编程模式,设计动态限流组件,逻辑如下图所示:
3. 监控系统负载,动态设置限流阈值,原理如下: