** **上一节讲到了hystrix提供的五个功能,这一节我们首先来讲hystrix中提供实时执行metrics信息的实现。为什么先讲metrics,因为很多功能都是基于metrics的数据来实现的,它是很多功能实现的基础。
首先来看一下通过hystrix调用服务的过程中会产生那些类型的metrics信息:
1.某一事件的持续指标。
2.某一事件窗口时间内持续指标。
3.某一事件窗口时间内最大指标。
4.某一事件窗口时间内指标分布。
在来看一下这些数据在hystrix中是如何产生、计算和流转的。
hystrix在执行服务调用的过程中会产生各类事件,执行模块首先将这些事件发送的metrics接受流中,而metrics统计流会监听metrics接受流,计算出各类统计数据。
metrics接收流
hystrix有以下接收流和对应接收的消息
接收流 | 接收消息 | 说明 |
HystrixCommandStartStream | HystrixCommandExecutionStarted | 命令开始执行消息流 |
HystrixCommandCompletionStream | HystrixCommandCompletion | 命令完成执行消息流 |
HystrixThreadPoolStartStream | HystrixCommandExecutionStarted | 线程池开始执行消息流 |
HystrixThreadPoolCompletionStream | HystrixCommandCompletion | 线程池执行完成消息流 |
HystrixCollapserEventStream | HystrixCollapserEvent | 合并命令执行消息流 |
metrics接收流使用单例模式,HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey分别对应同一个(HystrixCommandStartStream、HystrixCommandCompletionStream),(HystrixThreadPoolStartStream,HystrixThreadPoolCompletionStream),(HystrixCollapserEventStream)。
内部使用rxjava来实现消息机制
HystrixCommandStartStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
this.readOnlyStream = writeOnlySubject.share();
}
** **此外还提供了HystrixThreadEventStream统一执行接收消息然后发送到各个消息接收流类。
metrics接受流消息体
上面讲了hystrix metrics的接收流,接下来我们看看接收流具体接收的内容。
消息体 | 内容 |
HystrixCommandExecutionStarted | 内部包括了该命令的执行策略和并发数。 |
HystrixCommandCompletion | 内部包含执行结果对象ExecutionResult和请求上下文对象HystrixRequestContext |
ExecutionResult |
|
EventCounts |
|
HystrixCollapserEvent |
|
事件类型:
HystrixCommand只返回一个数据,当返回值时发生SUCCESS事件,执行失败时,发生FAILURE事件,HystrixObservableCommand可以返回多个值,当返回值时发生EMIT事件,当命令完成时,发生SUCCESS事件,执行失败时,发生FAILURE事件。
名称 | 描述 | 是否fallback |
EMIT | value返回,只在HystrixObservableCommand | NO |
SUCCESS | 执行成功 | NO |
FAILURE | 执行抛出异常 | YES |
TIMEOUT | 超时 | YES |
BAD_REQUEST | 抛出HystrixBadRequestException | NO |
SHORT_CIRCUITED | 熔断 | YES |
THREAD_POOL_REJECTED | 线程池拒绝 | YES |
SEMAPHORE_REJECTED | 信号量拒绝 | YES |
Fallback事件类型
名称 | 描述 | 是否抛出异常 |
FALLBACK_EMIT | fallback 返回值,只在HystrixObservableCommand | NO |
FALLBACK_SUCCESS | fallback 执行完成 | NO |
FALLBACK_FAILURE | fallback执行失败 | YES |
FALLBACK_REJECTION | fallback拒绝执行 | YES |
FALLBACK_MISSING | 没有fallback实现 | YES |
其他命令类型
名称 | 描述 |
EXCEPTION_THROWN | 执行命令值抛出异常 |
RESPONSE_FROM_CACHE | 从缓存中获取值 |
CALLAPSED | 命令聚合执行 |
线程池类型
名称 | 描述 |
EXECUTED | 线程池执行一个命令 |
REJECTED | 线程池拒绝执行命令 |
聚合事件类型
名称 | 描述 |
BATCH_EXECUTED | 执行一个batch批量执行 |
ADDED_TO_BATCH | 参数添加到batch中 |
RESPONSE_FROM_CACHE | 从缓存中获取值 |
metrics统计流
hystrix有以下统计流
类别 | 统计流 | 监听接收流 | 说明 |
窗口时间内持续统计 | RollingCommandEventCounterStream | HystrixCommandCompletionStream | 统计各种消息类型窗口期内次数 |
RollingCollapserEventCounterStream | HystrixCollapserEventStream | 统计各种消息类型窗口期内次数 | |
RollingThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 统计各种消息类型窗口期内次数 | |
HealthCountsStream | HystrixThreadPoolCompletionStream | 统计总调用次数,失败次数,失败率 | |
持续统计流 | CumulativeCommandEventCounterStream | HystrixCommandCompletionStream | 持久统计各种消息类型次数 |
CumulativeCollapserEventCounterStream | HystrixCollapserEventStream | 持久统计各种消息类型次数 | |
CumulativeThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 持久统计各种消息类型次数 | |
窗口时间内分布统计 | RollingCommandLatencyDistributionStream | HystrixCommandCompletionStream消息流的executelatency事件 | 通过Histogram计算窗口期内的分布 |
RollingCommandUserLatencyDistributionStream | HystrixCommandCompletionStream消息流的totalLatency事件 | 通过Histogram计算窗口期内的分布 | |
RollingCollapserBatchSizeDistributionStream | HystrixCollapserEventStream消息流的ADDED_TO_BATCH消息 | 通过Histogram计算窗口期内的分布 | |
窗口时间内最大值统计流 | RollingCommandMaxConcurrencyStream | HystrixCommandStartStream | 窗口期内的执行并发量取最大值 |
RollingThreadPoolMaxConcurrencyStream | HystrixThreadPoolStartStream | 窗口期内的执行并发量取最大值 |
窗口时间内持续统计流首先监听一个消息接受流,统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再对累计的数据进行累加(个数为:metrics.rollingStats.numBuckets),即为最终累计数据。
持续统计流首先监听一个消息流(开始消息流或者完成消息流),统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不断的累加累计数据。
窗口时间内分布统计流首先监听一个消息流,统计一段时间内各个类型消息存放在Histogram对象中(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets),然后对(个数为:metrics.rollingStats.numBuckets)内的Histogram对象进行运算操作,即为窗口期内某一时间的分布。
RollingConcurrencyStream监听一个消息流,例如HystrixCommandStartStream,然后通过RX java对一段时间内的执行并发量取最大值,重新发射,对窗口期内的执行并发量取最大值,重新发射。
metrics统计流使用单例模式,每个统计流分别对应一个HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey。
metrics模块
hystrix中可以通过metrics模块来获取执行过程中的数据,主要有三部分数据:命令执行metrics,线程池metrics,合并命令执行metrics,每个HystrixCommandKey、HystrixThreadPoolKey、HystrixCollapserKey对应一个相应的metrics(HystrixCommandMetrics,HystrixThreadPoolMetrics,HystrixCollapserMetrics)。metrics模块内部是通过监听消息流来获取各个指标的统计数据。
命令执行metrics
HystrixCommandMetrics为命令执行模块的metrics,在其初始化时会创建6个统计数据流:HealthCountsStream、RollingCommandEventCounterStream、CumulativeCommandEventCounterStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream、RollingCommandMaxConcurrencyStream,通过这些统计数据流来获取相应metrics信息。
private HealthCountsStream healthCountsStream;
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
/* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
super(null);
healthCountsStream = HealthCountsStream.getInstance(key, properties);
rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}
//获取指定事件窗口期内数据指标
public long getRollingCount(HystrixEventType eventType) {
return rollingCommandEventCounterStream.getLatest(eventType);
}
//获取指定事件持续的数据指标
public long getCumulativeCount(HystrixEventType eventType) {
return cumulativeCommandEventCounterStream.getLatest(eventType);
}//获取某一百分比的执行时间public int getExecutionTimePercentile(double percentile) {
return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile);
}//获取平均的执行时间
public int getExecutionTimeMean() {
return rollingCommandLatencyDistributionStream.getLatestMean();
}
//获取某一百分比的总时间
public int getTotalTimePercentile(double percentile) {
return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile);
}//获取平均的总时间
public int getTotalTimeMean() {
return rollingCommandUserLatencyDistributionStream.getLatestMean();
}
//获取窗口期内最大并发量
public long getRollingMaxConcurrentExecutions() {
return rollingCommandMaxConcurrencyStream.getLatestRollingMax();
}//获取当前并发量
public int getCurrentConcurrentExecutionCount() {
return concurrentExecutionCount.get();
} //获取命令执行健康情况
public HealthCounts getHealthCounts() {
return healthCountsStream.getLatest();
}
线程池metrics
HystrixThreadPoolMetrics为线程池执行模块的metrics,在其初始化时会获取3个数据流:RollingThreadPoolEventCounterStream、CumulativeThreadPoolEventCounterStream、RollingThreadPoolMaxConcurrencyStream通过这些统计流获得相应的统计数据。
private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
super(null);
this.threadPoolKey = threadPoolKey;
this.threadPool = threadPool;
this.properties = properties;
rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
}
/**
获取窗口期内线程池执行的个数*/
public long getRollingCountThreadsExecuted() {
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}
/**
获取持续的线程池执行个数*/
public long getCumulativeCountThreadsExecuted() {
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}
/**
获取窗口期内线程池拒绝的个数*/
public long getRollingCountThreadsRejected() {
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}
/**
获取持续内线程池拒绝的个数*/
public long getCumulativeCountThreadsRejected() {
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}
//获取指定事件窗口期内数据指标
public long getRollingCount(HystrixEventType.ThreadPool event) {
return rollingCounterStream.getLatestCount(event);
}
//获取指定事件持续的数据指标
public long getCumulativeCount(HystrixEventType.ThreadPool event) {
return cumulativeCounterStream.getLatestCount(event);
}/**
获取窗口期内最大并发量*/
public long getRollingMaxActiveThreads() {
return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
}
还有一些根据线程池获取线程池当前指标
public Number getCurrentActiveCount() {
return threadPool.getActiveCount();
}
public Number getCurrentCompletedTaskCount() {
return threadPool.getCompletedTaskCount();
}
public Number getCurrentCorePoolSize() {
return threadPool.getCorePoolSize();
}
public Number getCurrentLargestPoolSize() {
return threadPool.getLargestPoolSize();
}
public Number getCurrentMaximumPoolSize() {
return threadPool.getMaximumPoolSize();
}
public Number getCurrentPoolSize() {
return threadPool.getPoolSize();
}
public Number getCurrentTaskCount() {
return threadPool.getTaskCount();
}
public Number getCurrentQueueSize() {
return threadPool.getQueue().size();
}
合并命令执行metrics
** **HystrixCollapserMetrics为合并命令执行模块的metrics,在其初始化时会创建3个数据流:RollingCollapserEventCounterStream、CumulativeCollapserEventCounterStream、RollingCollapserBatchSizeDistributionStream,通过这些统计流获得相应的统计数据。
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;
/* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
super(null);
rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties);
cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties);
rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties);
}
//获取指定事件窗口期内数据指标
public long getRollingCount(HystrixEventType.Collapser collapserEventType) {
return rollingCollapserEventCounterStream.getLatest(collapserEventType);
}
//获取指定事件持续的数据指标
public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) {
return cumulativeCollapserEventCounterStream.getLatest(collapserEventType);
}
//获取指定百分比的batchsize
public int getBatchSizePercentile(double percentile) {
return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile);
}
//获取平均的batchsize
public int getBatchSizeMean() {
return rollingCollapserBatchSizeDistributionStream.getLatestMean();
}
其他流
HystrixConfigurationStream
该数据流定时将hystrix的最新properties配置,发送到该消息流中。com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用该流来获取配置信息。
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); } private static final Func1<Long, HystrixConfiguration> getAllConfig = new Func1<Long, HystrixConfiguration>() { @Override public HystrixConfiguration call(Long timestamp) { return HystrixConfiguration.from( getAllCommandConfig.call(timestamp), getAllThreadPoolConfig.call(timestamp), getAllCollapserConfig.call(timestamp) ); } };
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
@Override
public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) {
Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>();
for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) {
HystrixCommandKey commandKey = commandMetrics.getCommandKey();
HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); } return commandConfigPerKey; } }; private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig = new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() { @Override public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) { Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>(); for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); } return threadPoolConfigPerKey; } }; private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig = new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() { @Override public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) { Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>(); for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); } return collapserConfigPerKey; } };
metrics发布
有时,我们需要发布Hystrix中的metrics到其他地方,Hystrix提供了相应的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),实现这些接口,并在initial方法中实现发送hystrix的metrics的逻辑。实现HystrixMetricsPublisher,来创建这些实现类。其实hystrix对metrics的发布只是定义了接口和initial方法。Hystrix运行时,HystrixMetricsPublisherFactory通过HystrixPlugins获取HystrixMetricsPublisher的实现类。并且通过该实现类来创建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的实现类,并在初次创建时调用其initial方法。
例如:
通过coda hale 实现了将hystrix 的metrics信息输出到指定metrics监控系统中。
引入jar包:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-codahale-metrics-publisher</artifactId>
<version>1.5.9</version>
</dependency>
创建HystrixMetricsPublisher对象并注册到HystrixPlugins:
@Bean
HystrixMetricsPublisher hystrixMetricsPublisher() {
HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
return publisher;
}
coda hale实现源码如下:
public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher {
private final String metricsRootNode;
private final MetricRegistry metricRegistry;
public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) {
this(null, metricRegistry);
}
public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) {
this.metricsRootNode = metricsRootNode;
this.metricRegistry = metricRegistry;
}
@Override
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry);
}
@Override
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry);
}
@Override
public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry);
}
}
HystrixCodaHaleMetricsPublisher负责创建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。这三个对象实现基本逻辑是在initialize方法中向metricRegistry中设置相应信息。
public void initialize() { metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() {
@Override
public Boolean getValue() {
return circuitBreaker.isOpen();
}
.....
}