02、Hystrix实战:Hystrix之metrics

**  **上一节讲到了hystrix提供的五个功能,这一节我们首先来讲hystrix中提供实时执行metrics信息的实现。为什么先讲metrics,因为很多功能都是基于metrics的数据来实现的,它是很多功能实现的基础。

  首先来看一下通过hystrix调用服务的过程中会产生那些类型的metrics信息:

  1.某一事件的持续指标。

  2.某一事件窗口时间内持续指标。

  3.某一事件窗口时间内最大指标。

  4.某一事件窗口时间内指标分布。

  在来看一下这些数据在hystrix中是如何产生、计算和流转的。

*

  hystrix在执行服务调用的过程中会产生各类事件,执行模块首先将这些事件发送的metrics接受流中,而metrics统计流会监听metrics接受流,计算出各类统计数据。

metrics接收流

  hystrix有以下接收流和对应接收的消息

接收流 接收消息 说明
HystrixCommandStartStream HystrixCommandExecutionStarted 命令开始执行消息流
HystrixCommandCompletionStream HystrixCommandCompletion 命令完成执行消息流
HystrixThreadPoolStartStream HystrixCommandExecutionStarted 线程池开始执行消息流
HystrixThreadPoolCompletionStream HystrixCommandCompletion 线程池执行完成消息流
HystrixCollapserEventStream HystrixCollapserEvent 合并命令执行消息流

  metrics接收流使用单例模式,HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey分别对应同一个(HystrixCommandStartStream、HystrixCommandCompletionStream),(HystrixThreadPoolStartStream,HystrixThreadPoolCompletionStream),(HystrixCollapserEventStream)。

  内部使用rxjava来实现消息机制

    HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;
        this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

**  **此外还提供了HystrixThreadEventStream统一执行接收消息然后发送到各个消息接收流类。

metrics接受流消息体

  上面讲了hystrix metrics的接收流,接下来我们看看接收流具体接收的内容。

消息体 内容
HystrixCommandExecutionStarted 内部包括了该命令的执行策略和并发数。
HystrixCommandCompletion 内部包含执行结果对象ExecutionResult和请求上下文对象HystrixRequestContext
ExecutionResult

```java


private final EventCounts eventCounts;//事件数量 private final Exception failedExecutionException;//失败异常 private final Exception executionException; //执行异常 private final long startTimestamp;//命令开始执行时间 private final int executionLatency; //执行run的时间 private final int userThreadLatency; //请求提交到执行结束的时间 private final boolean executionOccurred;//ture 执行过命令 false 未执行过命令 private final boolean isExecutedInThread;//ture 使用线程池执行 false 不是使用线程池执行 ```
EventCounts

```java


private final BitSet events;事件类型 private final int numEmissions//emission次数 private final int numFallbackEmissions;//fallback次数 ```
HystrixCollapserEvent

```java


private final HystrixCollapserKey collapserKey;//合并命令key ```

事件类型:

  HystrixCommand只返回一个数据,当返回值时发生SUCCESS事件,执行失败时,发生FAILURE事件,HystrixObservableCommand可以返回多个值,当返回值时发生EMIT事件,当命令完成时,发生SUCCESS事件,执行失败时,发生FAILURE事件。

名称 描述 是否fallback
EMIT value返回,只在HystrixObservableCommand NO
SUCCESS 执行成功 NO
FAILURE 执行抛出异常 YES
TIMEOUT 超时 YES
BAD_REQUEST 抛出HystrixBadRequestException NO
SHORT_CIRCUITED 熔断 YES
THREAD_POOL_REJECTED 线程池拒绝 YES
SEMAPHORE_REJECTED 信号量拒绝 YES

Fallback事件类型

名称 描述 是否抛出异常
FALLBACK_EMIT fallback 返回值,只在HystrixObservableCommand NO
FALLBACK_SUCCESS fallback 执行完成 NO
FALLBACK_FAILURE fallback执行失败 YES
FALLBACK_REJECTION fallback拒绝执行 YES
FALLBACK_MISSING 没有fallback实现 YES

其他命令类型

名称 描述
EXCEPTION_THROWN 执行命令值抛出异常
RESPONSE_FROM_CACHE 从缓存中获取值
CALLAPSED 命令聚合执行

线程池类型

名称 描述
EXECUTED 线程池执行一个命令
REJECTED 线程池拒绝执行命令

聚合事件类型

名称 描述
BATCH_EXECUTED 执行一个batch批量执行
ADDED_TO_BATCH 参数添加到batch中
RESPONSE_FROM_CACHE 从缓存中获取值

metrics统计流

  hystrix有以下统计流

类别 统计流 监听接收流 说明
窗口时间内持续统计 RollingCommandEventCounterStream HystrixCommandCompletionStream 统计各种消息类型窗口期内次数
RollingCollapserEventCounterStream HystrixCollapserEventStream 统计各种消息类型窗口期内次数
RollingThreadPoolEventCounterStream HystrixThreadPoolCompletionStream 统计各种消息类型窗口期内次数
HealthCountsStream HystrixThreadPoolCompletionStream 统计总调用次数,失败次数,失败率
持续统计流 CumulativeCommandEventCounterStream HystrixCommandCompletionStream 持久统计各种消息类型次数
CumulativeCollapserEventCounterStream HystrixCollapserEventStream 持久统计各种消息类型次数
CumulativeThreadPoolEventCounterStream HystrixThreadPoolCompletionStream 持久统计各种消息类型次数
窗口时间内分布统计 RollingCommandLatencyDistributionStream HystrixCommandCompletionStream消息流的executelatency事件 通过Histogram计算窗口期内的分布
RollingCommandUserLatencyDistributionStream HystrixCommandCompletionStream消息流的totalLatency事件 通过Histogram计算窗口期内的分布
RollingCollapserBatchSizeDistributionStream HystrixCollapserEventStream消息流的ADDED_TO_BATCH消息 通过Histogram计算窗口期内的分布
窗口时间内最大值统计流 RollingCommandMaxConcurrencyStream HystrixCommandStartStream 窗口期内的执行并发量取最大值
RollingThreadPoolMaxConcurrencyStream HystrixThreadPoolStartStream 窗口期内的执行并发量取最大值

  窗口时间内持续统计流首先监听一个消息接受流,统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再对累计的数据进行累加(个数为:metrics.rollingStats.numBuckets),即为最终累计数据。

  持续统计流首先监听一个消息流(开始消息流或者完成消息流),统计一段时间内各个类型消息的累计数据(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不断的累加累计数据。

  窗口时间内分布统计流首先监听一个消息流,统计一段时间内各个类型消息存放在Histogram对象中(时间为:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets),然后对(个数为:metrics.rollingStats.numBuckets)内的Histogram对象进行运算操作,即为窗口期内某一时间的分布。

  RollingConcurrencyStream监听一个消息流,例如HystrixCommandStartStream,然后通过RX java对一段时间内的执行并发量取最大值,重新发射,对窗口期内的执行并发量取最大值,重新发射。

  metrics统计流使用单例模式,每个统计流分别对应一个HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey。

metrics模块

  hystrix中可以通过metrics模块来获取执行过程中的数据,主要有三部分数据:命令执行metrics,线程池metrics,合并命令执行metrics,每个HystrixCommandKey、HystrixThreadPoolKey、HystrixCollapserKey对应一个相应的metrics(HystrixCommandMetrics,HystrixThreadPoolMetrics,HystrixCollapserMetrics)。metrics模块内部是通过监听消息流来获取各个指标的统计数据。

命令执行metrics

  HystrixCommandMetrics为命令执行模块的metrics,在其初始化时会创建6个统计数据流:HealthCountsStream、RollingCommandEventCounterStream、CumulativeCommandEventCounterStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream、RollingCommandMaxConcurrencyStream,通过这些统计数据流来获取相应metrics信息。


private HealthCountsStream healthCountsStream;
    private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
    private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
    private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
    private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
    private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
    /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
        super(null);
        healthCountsStream = HealthCountsStream.getInstance(key, properties);
        rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
        cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
        rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
        rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
        rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
    }
  //获取指定事件窗口期内数据指标
    public long getRollingCount(HystrixEventType eventType) {
        return rollingCommandEventCounterStream.getLatest(eventType);
    }
  //获取指定事件持续的数据指标
    public long getCumulativeCount(HystrixEventType eventType) {
        return cumulativeCommandEventCounterStream.getLatest(eventType);
    }//获取某一百分比的执行时间public int getExecutionTimePercentile(double percentile) {
        return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile);
    }//获取平均的执行时间
    public int getExecutionTimeMean() {
        return rollingCommandLatencyDistributionStream.getLatestMean();
    }
    //获取某一百分比的总时间
    public int getTotalTimePercentile(double percentile) {
        return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile);
    }//获取平均的总时间
    public int getTotalTimeMean() {
        return rollingCommandUserLatencyDistributionStream.getLatestMean();
    }
    //获取窗口期内最大并发量
    public long getRollingMaxConcurrentExecutions() {
        return rollingCommandMaxConcurrencyStream.getLatestRollingMax();
    }//获取当前并发量
    public int getCurrentConcurrentExecutionCount() {
        return concurrentExecutionCount.get();
    }  //获取命令执行健康情况
    public HealthCounts getHealthCounts() {
        return healthCountsStream.getLatest();
    }

线程池metrics

  HystrixThreadPoolMetrics为线程池执行模块的metrics,在其初始化时会获取3个数据流:RollingThreadPoolEventCounterStream、CumulativeThreadPoolEventCounterStream、RollingThreadPoolMaxConcurrencyStream通过这些统计流获得相应的统计数据。

    private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
    private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
        super(null);
        this.threadPoolKey = threadPoolKey;
        this.threadPool = threadPool;
        this.properties = properties;
        rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
    }
/**
     获取窗口期内线程池执行的个数*/
    public long getRollingCountThreadsExecuted() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
     获取持续的线程池执行个数*/
    public long getCumulativeCountThreadsExecuted() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
    获取窗口期内线程池拒绝的个数*/
    public long getRollingCountThreadsRejected() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }

    /**
    获取持续内线程池拒绝的个数*/
    public long getCumulativeCountThreadsRejected() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }
    //获取指定事件窗口期内数据指标
    public long getRollingCount(HystrixEventType.ThreadPool event) {
        return rollingCounterStream.getLatestCount(event);
    }
   //获取指定事件持续的数据指标
    public long getCumulativeCount(HystrixEventType.ThreadPool event) {
        return cumulativeCounterStream.getLatestCount(event);
    }/**
    获取窗口期内最大并发量*/
    public long getRollingMaxActiveThreads() {
        return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
    }

还有一些根据线程池获取线程池当前指标

    public Number getCurrentActiveCount() {
        return threadPool.getActiveCount();
    }
    public Number getCurrentCompletedTaskCount() {
        return threadPool.getCompletedTaskCount();
    }
    public Number getCurrentCorePoolSize() {
        return threadPool.getCorePoolSize();
    }
    public Number getCurrentLargestPoolSize() {
        return threadPool.getLargestPoolSize();
    }
    public Number getCurrentMaximumPoolSize() {
        return threadPool.getMaximumPoolSize();
    }
    public Number getCurrentPoolSize() {
        return threadPool.getPoolSize();
    }
    public Number getCurrentTaskCount() {
        return threadPool.getTaskCount();
    }
    public Number getCurrentQueueSize() {
        return threadPool.getQueue().size();
    }

合并命令执行metrics

**  **HystrixCollapserMetrics为合并命令执行模块的metrics,在其初始化时会创建3个数据流:RollingCollapserEventCounterStream、CumulativeCollapserEventCounterStream、RollingCollapserBatchSizeDistributionStream,通过这些统计流获得相应的统计数据。

    private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;

    /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
        super(null);
        rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties);
        cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties);
        rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties);
    }
    //获取指定事件窗口期内数据指标
    public long getRollingCount(HystrixEventType.Collapser collapserEventType) {
        return rollingCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //获取指定事件持续的数据指标
    public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) {
        return cumulativeCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //获取指定百分比的batchsize
    public int getBatchSizePercentile(double percentile) {
        return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile);
    }
    //获取平均的batchsize
    public int getBatchSizeMean() {
        return rollingCollapserBatchSizeDistributionStream.getLatestMean();
    }

其他流

HystrixConfigurationStream

  该数据流定时将hystrix的最新properties配置,发送到该消息流中。com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用该流来获取配置信息。

public HystrixConfigurationStream(final int intervalInMilliseconds) {
        this.intervalInMilliseconds = intervalInMilliseconds;
        this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
                .map(getAllConfig)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); } private static final Func1<Long, HystrixConfiguration> getAllConfig = new Func1<Long, HystrixConfiguration>() { @Override public HystrixConfiguration call(Long timestamp) { return HystrixConfiguration.from( getAllCommandConfig.call(timestamp), getAllThreadPoolConfig.call(timestamp), getAllCollapserConfig.call(timestamp) ); } };
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
            new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
                @Override
                public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) {
                    Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>();
                    for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) {
                        HystrixCommandKey commandKey = commandMetrics.getCommandKey();
                        HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); } return commandConfigPerKey; } }; private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig = new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() { @Override public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) { Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>(); for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); } return threadPoolConfigPerKey; } }; private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig = new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() { @Override public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) { Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>(); for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); } return collapserConfigPerKey; } };

metrics发布

  有时,我们需要发布Hystrix中的metrics到其他地方,Hystrix提供了相应的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),实现这些接口,并在initial方法中实现发送hystrix的metrics的逻辑。实现HystrixMetricsPublisher,来创建这些实现类。其实hystrix对metrics的发布只是定义了接口和initial方法。Hystrix运行时,HystrixMetricsPublisherFactory通过HystrixPlugins获取HystrixMetricsPublisher的实现类。并且通过该实现类来创建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的实现类,并在初次创建时调用其initial方法。

例如:

  通过coda hale 实现了将hystrix 的metrics信息输出到指定metrics监控系统中。

  引入jar包:

     <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-codahale-metrics-publisher</artifactId>
            <version>1.5.9</version>
        </dependency>

  创建HystrixMetricsPublisher对象并注册到HystrixPlugins:

  @Bean

HystrixMetricsPublisher hystrixMetricsPublisher() {

HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);

HystrixPlugins.getInstance().registerMetricsPublisher(publisher);

return publisher;

}

  coda hale实现源码如下:

public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher {

      private final String metricsRootNode;

      private final MetricRegistry metricRegistry;

      public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) {

          this(null, metricRegistry);

      }

      public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) {

          this.metricsRootNode = metricsRootNode;

          this.metricRegistry = metricRegistry;

      }

      @Override

      public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {

          return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry);

      }

      @Override

      public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {

          return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry);

      }

      @Override

      public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {

          return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry);

      }

  }

  HystrixCodaHaleMetricsPublisher负责创建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。这三个对象实现基本逻辑是在initialize方法中向metricRegistry中设置相应信息。

public void initialize() {      metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() {

      @Override

      public Boolean getValue() {

           return circuitBreaker.isOpen();

      }

    .....

 }