上一节讲到了hystrix提供的五个功能,这一节我们首先来讲hystrix中提供实时执行metrics信息的实现。为什么先讲metrics,因为很多功能都是基于metrics的数据来实现的,它是很多功能实现的基础。
首先来看一下通过hystrix调用服务的过程中会产生那些类型的metrics信息:
1.某一事件的持续指标。
2.某一事件窗口时间内持续指标。
3.某一事件窗口时间内最大指标。
4.某一事件窗口时间内指标分布。
在来看一下这些数据在hystrix中是如何产生、计算和流转的。
hystrix在执行服务调用的过程中会产生各类事件,执行模块首先将这些事件发送的metrics接受流中,而metrics统计流会监听metrics接受流,计算出各类统计数据。
metrics接收流
hystrix有以下接收流和对应接收的消息
接收流 | 接收消息 | 说明 |
HystrixCommandStartStream | HystrixCommandExecutionStarted | 命令开始执行消息流 |
HystrixCommandCompletionStream | HystrixCommandCompletion | 命令完成执行消息流 |
HystrixThreadPoolStartStream | HystrixCommandExecutionStarted | 线程池开始执行消息流 |
HystrixThreadPoolCompletionStream | HystrixCommandCompletion | 线程池执行完成消息流 |
HystrixCollapserEventStream | HystrixCollapserEvent | 合并命令执行消息流 |
metrics接收流使用单例模式,HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey分别对应同一个(HystrixCommandStartStream、HystrixCommandCompletionStream),(HystrixThreadPoolStartStream,HystrixThreadPoolCompletionStream),(HystrixCollapserEventStream)。
内部使用rxjava来实现消息机制
HystrixCommandStartStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create()); this.readOnlyStream = writeOnlySubject.share(); }
此外还提供了HystrixThreadEventStream统一执行接收消息然后发送到各个消息接收流类。
metrics接受流消息体
上面讲了hystrix metrics的接收流,接下来我们看看接收流具体接收的内容。
消息体 | 内容 |
HystrixCommandExecutionStarted | 内部包括了该命令的执行策略和并发数。 |
HystrixCommandCompletion | 内部包含执行结果对象ExecutionResult和请求上下文对象HystrixRequestContext |
ExecutionResult |
private final EventCounts eventCounts;//事件数量 private final Exception failedExecutionException;//失败异常 private final Exception executionException; //执行异常 private final long startTimestamp;//命令开始执行时间 private final int executionLatency; //执行run的时间 private final int userThreadLatency; //请求提交到执行结束的时间 private final boolean executionOccurred;//ture 执行过命令 false 未执行过命令 private final boolean isExecutedInThread;//ture 使用线程池执行 false 不是使用线程池执行 private final HystrixCollapserKey collapserKey; |
EventCounts |
private final BitSet events;事件类型 private final int numEmissions//emission次数 private final int numFallbackEmissions;//fallback次数 private final int numCollapsed;//合并格式 |
HystrixCollapserEvent |
private final HystrixCollapserKey collapserKey;//合并命令key private final HystrixEventType.Collapser eventType;事件类型 |
事件类型:
HystrixCommand只返回一个数据,当返回值时发生SUCCESS事件,执行失败时,发生FAILURE事件,HystrixObservableCommand可以返回多个值,当返回值时发生EMIT事件,当命令完成时,发生SUCCESS事件,执行失败时,发生FAILURE事件。
名称 | 描述 | 是否fallback |
EMIT | value返回,只在HystrixObservableCommand | NO |
SUCCESS | 执行成功 | NO |
FAILURE | 执行抛出异常 | YES |
TIMEOUT | 超时 | YES |
BAD_REQUEST | 抛出HystrixBadRequestException | NO |
SHORT_CIRCUITED | 熔断 | YES |
THREAD_POOL_REJECTED | 线程池拒绝 | YES |
SEMAPHORE_REJECTED | 信号量拒绝 | YES |
Fallback事件类型
名称 | 描述 | 是否抛出异常 |
FALLBACK_EMIT | fallback 返回值,只在HystrixObservableCommand | NO |
FALLBACK_SUCCESS | fallback 执行完成 | NO |
FALLBACK_FAILURE | fallback执行失败 | YES |
FALLBACK_REJECTION | fallback拒绝执行 | YES |
FALLBACK_MISSING | 没有fallback实现 | YES |
其他命令类型
名称 | 描述 |
EXCEPTION_THROWN | 执行命令值抛出异常 |
RESPONSE_FROM_CACHE | 从缓存中获取值 |
CALLAPSED | 命令聚合执行 |
线程池类型
名称 | 描述 |
EXECUTED | 线程池执行一个命令 |
REJECTED | 线程池拒绝执行命令 |
聚合事件类型
名称 | 描述 |
BATCH_EXECUTED | 执行一个batch批量执行 |
ADDED_TO_BATCH | 参数添加到batch中 |
RESPONSE_FROM_CACHE | 从缓存中获取值 |
metrics统计流
hystrix有以下统计流
类别 | 统计流 | 监听接收流 | 说明 |
窗口时间内持续统计 | RollingCommandEventCounterStream | HystrixCommandCompletionStream | 统计各种消息类型窗口期内次数 |
RollingCollapserEventCounterStream | HystrixCollapserEventStream | 统计各种消息类型窗口期内次数 | |
RollingThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 统计各种消息类型窗口期内次数 | |
HealthCountsStream | HystrixThreadPoolCompletionStream | 统计总调用次数,失败次数,失败率 | |
持续统计流 | CumulativeCommandEventCounterStream | HystrixCommandCompletionStream | 持久统计各种消息类型次数 |
CumulativeCollapserEventCounterStream | HystrixCollapserEventStream | 持久统计各种消息类型次数 | |
CumulativeThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 持久统计各种消息类型次数 | |
窗口时间内分布统计 | RollingCommandLatencyDistributionStream | HystrixCommandCompletionStream消息流的executelatency事件 | 通过Histogram计算窗口期内的分布 |
RollingCommandUserLatencyDistributionStream | HystrixCommandCompletionStream消息流的totalLatency事件 | 通过Histogram计算窗口期内的分布 | |
RollingCollapserBatchSizeDistributionStream | HystrixCollapserEventStream消息流的ADDED_TO_BATCH消息 | 通过Histogram计算窗口期内的分布 | |
窗口时间内最大值统计流 | RollingCommandMaxConcurrencyStream | HystrixCommandStartStream | 窗口期内的执行并发量取最大值 |
RollingThreadPoolMaxConcurrencyStream | HystrixThreadPoolStartStream | 窗口期内的执行并发量取最大值 |
窗口时间内持续统计流首先监听一个消息接受流,统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再对累计的数据进行累加(个数为:metrics.rollingStats.numBuckets),即为最终累计数据。
持续统计流首先监听一个消息流(开始消息流或者完成消息流),统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不断的累加累计数据。
窗口时间内分布统计流首先监听一个消息流,统计一段时间内各个类型消息存放在Histogram对象中(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets),然后对(个数为:metrics.rollingStats.numBuckets)内的Histogram对象进行运算操作,即为窗口期内某一时间的分布。
RollingConcurrencyStream监听一个消息流,例如HystrixCommandStartStream,然后通过RX java对一段时间内的执行并发量取最大值,重新发射,对窗口期内的执行并发量取最大值,重新发射。
metrics统计流使用单例模式,每个统计流分别对应一个HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey。
metrics模块
hystrix中可以通过metrics模块来获取执行过程中的数据,主要有三部分数据:命令执行metrics,线程池metrics,合并命令执行metrics,每个HystrixCommandKey、HystrixThreadPoolKey、HystrixCollapserKey对应一个相应的metrics(HystrixCommandMetrics,HystrixThreadPoolMetrics,HystrixCollapserMetrics)。metrics模块内部是通过监听消息流来获取各个指标的统计数据。
命令执行metrics
HystrixCommandMetrics为命令执行模块的metrics,在其初始化时会创建6个统计数据流:HealthCountsStream、RollingCommandEventCounterStream、CumulativeCommandEventCounterStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream、RollingCommandMaxConcurrencyStream,通过这些统计数据流来获取相应metrics信息。
private HealthCountsStream healthCountsStream; private final RollingCommandEventCounterStream rollingCommandEventCounterStream; private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream; private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream; private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream; private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream; /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); }
//获取指定事件窗口期内数据指标 public long getRollingCount(HystrixEventType eventType) { return rollingCommandEventCounterStream.getLatest(eventType); }
//获取指定事件持续的数据指标 public long getCumulativeCount(HystrixEventType eventType) { return cumulativeCommandEventCounterStream.getLatest(eventType); }//获取某一百分比的执行时间public int getExecutionTimePercentile(double percentile) { return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile); }//获取平均的执行时间 public int getExecutionTimeMean() { return rollingCommandLatencyDistributionStream.getLatestMean(); } //获取某一百分比的总时间 public int getTotalTimePercentile(double percentile) { return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile); }//获取平均的总时间 public int getTotalTimeMean() { return rollingCommandUserLatencyDistributionStream.getLatestMean(); } //获取窗口期内最大并发量 public long getRollingMaxConcurrentExecutions() { return rollingCommandMaxConcurrencyStream.getLatestRollingMax(); }//获取当前并发量 public int getCurrentConcurrentExecutionCount() { return concurrentExecutionCount.get(); } //获取命令执行健康情况 public HealthCounts getHealthCounts() { return healthCountsStream.getLatest(); }
线程池metrics
HystrixThreadPoolMetrics为线程池执行模块的metrics,在其初始化时会获取3个数据流:RollingThreadPoolEventCounterStream、CumulativeThreadPoolEventCounterStream、RollingThreadPoolMaxConcurrencyStream通过这些统计流获得相应的统计数据。
private final RollingThreadPoolEventCounterStream rollingCounterStream; private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream; private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream; private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) { super(null); this.threadPoolKey = threadPoolKey; this.threadPool = threadPool; this.properties = properties; rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties); cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties); rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties); } /** 获取窗口期内线程池执行的个数*/ public long getRollingCountThreadsExecuted() { return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED); } /** 获取持续的线程池执行个数*/ public long getCumulativeCountThreadsExecuted() { return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED); } /** 获取窗口期内线程池拒绝的个数*/ public long getRollingCountThreadsRejected() { return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED); } /** 获取持续内线程池拒绝的个数*/ public long getCumulativeCountThreadsRejected() { return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED); } //获取指定事件窗口期内数据指标 public long getRollingCount(HystrixEventType.ThreadPool event) { return rollingCounterStream.getLatestCount(event); } //获取指定事件持续的数据指标 public long getCumulativeCount(HystrixEventType.ThreadPool event) { return cumulativeCounterStream.getLatestCount(event); }/** 获取窗口期内最大并发量*/ public long getRollingMaxActiveThreads() { return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax(); }
还有一些根据线程池获取线程池当前指标
public Number getCurrentActiveCount() { return threadPool.getActiveCount(); } public Number getCurrentCompletedTaskCount() { return threadPool.getCompletedTaskCount(); } public Number getCurrentCorePoolSize() { return threadPool.getCorePoolSize(); } public Number getCurrentLargestPoolSize() { return threadPool.getLargestPoolSize(); } public Number getCurrentMaximumPoolSize() { return threadPool.getMaximumPoolSize(); } public Number getCurrentPoolSize() { return threadPool.getPoolSize(); } public Number getCurrentTaskCount() { return threadPool.getTaskCount(); } public Number getCurrentQueueSize() { return threadPool.getQueue().size(); }
合并命令执行metrics
HystrixCollapserMetrics为合并命令执行模块的metrics,在其初始化时会创建3个数据流:RollingCollapserEventCounterStream、CumulativeCollapserEventCounterStream、RollingCollapserBatchSizeDistributionStream,通过这些统计流获得相应的统计数据。
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream; private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream; private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream; /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) { super(null); rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties); cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties); rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties); } //获取指定事件窗口期内数据指标 public long getRollingCount(HystrixEventType.Collapser collapserEventType) { return rollingCollapserEventCounterStream.getLatest(collapserEventType); } //获取指定事件持续的数据指标 public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) { return cumulativeCollapserEventCounterStream.getLatest(collapserEventType); } //获取指定百分比的batchsize public int getBatchSizePercentile(double percentile) { return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile); } //获取平均的batchsize public int getBatchSizeMean() { return rollingCollapserBatchSizeDistributionStream.getLatestMean(); }
其他流
HystrixConfigurationStream
该数据流定时将hystrix的最新properties配置,发送到该消息流中。com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用该流来获取配置信息。
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
private static final Func1<Long, HystrixConfiguration> getAllConfig =
new Func1<Long, HystrixConfiguration>() {
@Override
public HystrixConfiguration call(Long timestamp) {
return HystrixConfiguration.from(
getAllCommandConfig.call(timestamp),
getAllThreadPoolConfig.call(timestamp),
getAllCollapserConfig.call(timestamp)
);
}
};
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
@Override
public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) {
Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>();
for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) {
HystrixCommandKey commandKey = commandMetrics.getCommandKey();
HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey();
HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup();
commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties()));
}
return commandConfigPerKey;
}
};
private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig =
new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() {
@Override
public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) {
Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>();
for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) {
HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey();
threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties()));
}
return threadPoolConfigPerKey;
}
};
private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig =
new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() {
@Override
public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) {
Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>();
for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) {
HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey();
collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties()));
}
return collapserConfigPerKey;
}
};
metrics发布
有时,我们需要发布Hystrix中的metrics到其他地方,Hystrix提供了相应的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),实现这些接口,并在initial方法中实现发送hystrix的metrics的逻辑。实现HystrixMetricsPublisher,来创建这些实现类。其实hystrix对metrics的发布只是定义了接口和initial方法。Hystrix运行时,HystrixMetricsPublisherFactory通过HystrixPlugins获取HystrixMetricsPublisher的实现类。并且通过该实现类来创建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的实现类,并在初次创建时调用其initial方法。
例如:
通过coda hale 实现了将hystrix 的metrics信息输出到指定metrics监控系统中。
引入jar包:
<dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-codahale-metrics-publisher</artifactId> <version>1.5.9</version> </dependency>
创建HystrixMetricsPublisher对象并注册到HystrixPlugins:
@Bean
HystrixMetricsPublisher hystrixMetricsPublisher() {
HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
return publisher;
}
coda hale实现源码如下:
public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher { private final String metricsRootNode; private final MetricRegistry metricRegistry; public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) { this(null, metricRegistry); } public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) { this.metricsRootNode = metricsRootNode; this.metricRegistry = metricRegistry; } @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) { return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry); } @Override public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) { return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry); } @Override public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry); } }
HystrixCodaHaleMetricsPublisher负责创建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。这三个对象实现基本逻辑是在initialize方法中向metricRegistry中设置相应信息。
public void initialize() { metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() { @Override public Boolean getValue() { return circuitBreaker.isOpen(); } ..... }