上一节讲到HystrixCommand的执行流程。
Hystrix内部将一些模块实现成了插件,并且提供了用户提供自己的实现,通过配置来替换插件。Hystrix提供了5个插件,分别为并发相关插件(HystrixConcurrencyStrategy)、事件通知插件(HystrixEventNotifier)、度量信息插件(HystrixMetricsPublisher)Properties配置插件(HystrixPropertiesStrategy)、插件HystrixCommand回调函数类(HystrixCommandExecutionHook)。
想要使用自定义的插件,需要实现相应接口,然后通过HystrixPlugins注册插件、或者通过配置配置自定义插件、或者将插件放在类路径。HystrixPlugins获取插件顺序:
1.HystrixPlugins 会使用通过register注册的实现类。
2.HystrixPlugins会去properties(HystrixDynamicProperties)配置下需找相应类型的实现类。
3.通过ServiceLoader获取相应类型的实现类。
4.使用默认实现类。
本质原理是,Hystrix内部通过接口方式编程,具体实现通过HystrixPlugins获取。HystrixPlugins会根据设定的顺序获取插件。
public HystrixCommandExecutionHook getCommandExecutionHook() { //如果HystrixPlugins没有注册插件 if (commandExecutionHook.get() == null) { //通过配置文件和查找类路径实现接口 Object impl = getPluginImplementation(HystrixCommandExecutionHook.class); //如果没有实现自定义插件,使用默认插件 if (impl == null) { commandExecutionHook.compareAndSet(null, HystrixCommandExecutionHookDefault.getInstance()); } else { commandExecutionHook.compareAndSet(null, (HystrixCommandExecutionHook) impl); } } return commandExecutionHook.get(); } private <T> T getPluginImplementation(Class<T> pluginClass) { T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; return findService(pluginClass, classLoader); } private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) { String classSimpleName = pluginClass.getSimpleName(); String propertyName = "hystrix.plugin." + classSimpleName + ".implementation"; String implementingClass = dynamicProperties.getString(propertyName, null).get(); if (implementingClass != null) { ... Class<?> cls = Class.forName(implementingClass); cls = cls.asSubclass(pluginClass); return (T) cls.newInstance(); ... } else { return null; } } private static <T> T findService( Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader<T> sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }
HystrixEventNotifier
在HystrixCommand和HystrixObservableCommand执行过程中会触发一些事件,实现HystrixEventNotifier可以监听这些事件进行一些告警和数据收集。
EMIT | 当发射一个返回数据时,发送该消息,HystrixCommand不会发送该消息 |
SUCCESS | 当发射一个返回数据时,发送该消息。 |
FAILURE | |
TIMEOUT | 发生time out异常时,发送该类型消息 |
BAD_REQUEST | 发生bad request异常时,发送该类型消息 |
SHORT_CIRCUITED | 熔断异常时,发送该类型消息 |
THREAD_POOL_REJECTED | 处理线程池拒绝异常时,发送该类型消息 |
SEMAPHORE_REJECTED | 处理信号量拒绝异常时,发送该类型消息 |
FALLBACK_EMIT | 当fallback发射一个返回数据时,发送该消息,HystrixCommand不会发送该消息 |
FALLBACK_SUCCESS | 当fallback执行结束时,发送该消息。 |
FALLBACK_FAILURE | 当fallback执行异常并且异常不为UnsupportedOperationException时,发送该消息。 |
FALLBACK_REJECTION | 当fallback执行超过指定并发量被拒绝时,发送该消息。 |
FALLBACK_MISSING | 当fallback执行异常并且异常为UnsupportedOperationException时,发送该消息。 |
EXCEPTION_THROWN | 当执行命令或fallback出现异常时,发送该消息 |
RESPONSE_FROM_CACHE | 从缓存中获取结果时,发送该类型消息 |
CANCELLED | |
COLLAPSED | 当创建一个新的批量执行命令时,发送该类型消息 |
HystrixCommandExecutionHook
在HystrixCommand和HystrixObservableCommand执行过程中会调用HystrixCommandExecutionHook的相应方法,通过实现HystrixCommandExecutionHook可以在HystrixCommand或HystrixObservableCommand执行期间的响应阶段进行回调。
方法 | 调用点 |
---|---|
onStart |
HystrixInvokable执行前被调用 |
onEmit |
whenever the HystrixInvokable emits a value |
onError |
if the HystrixInvokable fails with an exception |
onSuccess |
iHystrixInvokable执行成功 |
onThreadStart |
at the start of thread execution if the HystrixInvokable is a HystrixCommand executed using the THREAD ExecutionIsolationStrategy |
onThreadComplete |
at the completion of thread execution if the HystrixInvokable is a HystrixCommand executed using the THREAD ExecutionIsolationStrategy |
onExecutionStart |
when the user-defined execution method in the HystrixInvokable begins |
onExecutionEmit |
whenever the user-defined execution method in the HystrixInvokable emits a value |
onExecutionError |
when the user-defined execution method in the HystrixInvokable fails with an exception |
onExecutionSuccess |
when the user-defined execution method in the HystrixInvokable completes successfully |
onFallbackStart |
if the HystrixInvokable attempts to call the fallback method |
onFallbackEmit |
whenever the fallback method in the HystrixInvokable emits a value |
onFallbackError |
if the fallback method in the HystrixInvokable fails with an exception or does not exist when a call attempt is made |
onFallbackSuccess |
if the fallback method in the HystrixInvokable completes successfully |
onCacheHit |
if the response to the HystrixInvokable is found in the HystrixRequestCache |
onUnsubscribe |
Metrics发布
通过实现HystrixMetricsPublisher可以获取所有实例的metrics信息,这样我们就可以获取和存储这些metrics信息,以便后续处理。默认的实现不会发布这些metrics信息。
在创建AbstractCommand,HystrixThreadPoolDefault,HystrixObservableCollapser时分别会通过HystrixPlugins获取HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool,HystrixMetricsPublisherCollapser并初始化。
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { ... HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); ... }
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); ... HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }
HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { ... HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties); ... }
配置策略
配置策略的作用是提供Hystrix配置信息。
在创建AbstractCommand,HystrixThreadPoolDefault,HystrixObservableCollapser时分别会通过HystrixPropertiesStrategy获取HystrixCommandProperties,HystrixThreadPoolProperties,HystrixCollapserProperties并初始化。
private static HystrixCommandProperties initCommandProperties(HystrixCommandKey commandKey, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandProperties.Setter commandPropertiesDefaults) { if (propertiesStrategy == null) { return HystrixPropertiesFactory.getCommandProperties(commandKey, commandPropertiesDefaults); } else { // used for unit testing return propertiesStrategy.getCommandProperties(commandKey, commandPropertiesDefaults); } }
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); ... }
HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { ... HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder); ... }
并发策略
并发策略可以让用户自定义实现线程池、请求作用域、Callable。