18、Kafka实战:消费者组消费进度监控都怎么实现

一.消费者Lag

消费者Lag(Consumer Lag):监控他们的消费进度了,或者监控他们消费的滞后程度

滞后程度:指消费者当前落后于生产者的程度

比如说:Kafka生产者向某主题成功生产了100万消息,你的消费者当前消费了80w条消息,那么你的消费者滞后了20万条消息,即Lag等于20w。

Lag单位消息数:我们是在主题级别上讨论Lag,实际上Kafka监控Lag的层级是在分区上。

手动汇总所有主题分区的Lag,累加起来,合并成最终的Lag值。

二.Lag是最最最重点的监控指标

反映一个消费者的运行情况

一个正常工作的消费者,他的Lag值应该很小甚至是0;

Lag值很小为0:表示消费者能及时消费生产者生产出来的消息,滞后程度很小。

Lag值很大:如果一个消费者Lag值很大,通常表示它无法跟上生产者的速度,从而拖慢下游消息的处理速度。

三.Lag的马太效应:

  • 由于消费者的速度无法匹配及生产者的速度,导致他消费数据不在操作系统的页缓存上,从而存入磁盘中。
  • 这样消费者不得补从磁盘中读取他们,进一步拉大与生产者的差距,进而出现马太效应,Lag越来越大。

四.怎么监控它:

  • 使用Kafka自带的命令行工具kafka-consumer-groups脚本。
  • 使用Kafka Java Consumer API编程
  • 使用Kafka自带的JMX监控指标

五.Kafka自带命令

使用Kafka自带的命令行工具bin/kafka-consumer-groups.sh(bat)

Kafka-consumer-group脚本时Kafka为提供的最直接的监控消费者消费进度的工具。

使用方法:

  • 使用kafka-consumer-groups脚本很简单
  • 该脚本位于kafka安装目录的bin子目录下
  • 我们可以通过下面的命令查看给定消费者的Lag值
$ bin/kafka-consumer-groups.sh --bootstrap-server <kafka broker连接信息> --describe --group
<group名称>
  • Kafka连接信息就是<主机名:端口> group
  • Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。

    $  bin/kafka-consumer-groups.sh --bootstarp -server localhost:9092 --describe --group testgroup

topic partition current-offset Log-end-offset Lag customer-id host
test 6 112760 714285 601524 customer-id-1 127.0.0.1

运行命令时,我指定了 Kafka 集群的连接信息,即 localhost:9092。

  • 设置要查询的消费者组名:testgroup
  • 会按照消费者组订阅主题的分区进行展示
  • 每个分区一行数据,主题,分区,每个分区当前最新生产的消息的位移值( LOG-END-OFFSET 列值),该消费者组当前最新消费消息的位移值( CURRENT-OFFSET 值),
  • LAG值(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLIENT-ID信息。

六.Kafka Java Consumer API

Consumer端API监控给定消费者组的Lag值


 public static Map<TopicPartition,Long> lagof(String groupID,String bootstrapServers) throws TimeoutException {
        Properties props=new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);

        try(AdminClient client=AdminClient.create(props)){
//第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位
            ListConsumerGroupOffsetsResult result=client.listConsumerGroupOffsets(groupID);
            try{

                Map<TopicPartition, OffsetAndMetadata> consumedOffsets=result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//禁止自动提交
                props.put(ConsumerConfig.GROUP_ID_CONFIG,groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props)){
                    Map<TopicPartition,Long> endOffsets=consumer.endOffsets(consumedOffsets.keySet());
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry->entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())
                            );
                }
            }catch (InterruptedException e){

            }catch (Exception e){
                // 处理ExecutionException
                // ...
                return Collections.emptyMap();
            }catch (TimeoutException e){
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
        return Collections.emptyMap();
    }

Kafka JMX 监控指标

Lead值:

  • 消费者最新消费消息的位移与分区当前第一条消息位移的差值

监控原因:

  • kafka的消息是有留存时间设置的,默认是1周,也就是说kafka默认删除1周前的数据。
  • 如果消费者程序足够慢,慢到快删除了

1、 消费者从头消费一遍数据;
2、 消费者从最新的消息位移处开始消费,之前没来得及的消息就会被跳过,造成丢消息;

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: