一.消费者Lag
消费者Lag(Consumer Lag):监控他们的消费进度了,或者监控他们消费的滞后程度
滞后程度:指消费者当前落后于生产者的程度
比如说:Kafka生产者向某主题成功生产了100万消息,你的消费者当前消费了80w条消息,那么你的消费者滞后了20万条消息,即Lag等于20w。
Lag单位消息数:我们是在主题级别上讨论Lag,实际上Kafka监控Lag的层级是在分区上。
手动汇总所有主题分区的Lag,累加起来,合并成最终的Lag值。
二.Lag是最最最重点的监控指标
反映一个消费者的运行情况
一个正常工作的消费者,他的Lag值应该很小甚至是0;
Lag值很小为0:表示消费者能及时消费生产者生产出来的消息,滞后程度很小。
Lag值很大:如果一个消费者Lag值很大,通常表示它无法跟上生产者的速度,从而拖慢下游消息的处理速度。
三.Lag的马太效应:
- 由于消费者的速度无法匹配及生产者的速度,导致他消费数据不在操作系统的页缓存上,从而存入磁盘中。
- 这样消费者不得补从磁盘中读取他们,进一步拉大与生产者的差距,进而出现马太效应,Lag越来越大。
四.怎么监控它:
- 使用Kafka自带的命令行工具kafka-consumer-groups脚本。
- 使用Kafka Java Consumer API编程
- 使用Kafka自带的JMX监控指标
五.Kafka自带命令
使用Kafka自带的命令行工具bin/kafka-consumer-groups.sh(bat)
Kafka-consumer-group脚本时Kafka为提供的最直接的监控消费者消费进度的工具。
使用方法:
- 使用kafka-consumer-groups脚本很简单
- 该脚本位于kafka安装目录的bin子目录下
- 我们可以通过下面的命令查看给定消费者的Lag值
$ bin/kafka-consumer-groups.sh --bootstrap-server <kafka broker连接信息> --describe --group
<group名称>
- Kafka连接信息就是
<主机名:端口>
group - Kafka 连接信息就是
< 主机名:端口 >
对,而 group 名称就是你的消费者程序中设置的 group.id 值。
$ bin/kafka-consumer-groups.sh --bootstarp -server localhost:9092 --describe --group testgroup |
||||||
---|---|---|---|---|---|---|
topic | partition | current-offset | Log-end-offset | Lag | customer-id | host |
test | 6 | 112760 | 714285 | 601524 | customer-id-1 | 127.0.0.1 |
运行命令时,我指定了 Kafka 集群的连接信息,即 localhost:9092。
- 设置要查询的消费者组名:testgroup
- 会按照消费者组订阅主题的分区进行展示
- 每个分区一行数据,主题,分区,每个分区当前最新生产的消息的位移值( LOG-END-OFFSET 列值),该消费者组当前最新消费消息的位移值( CURRENT-OFFSET 值),
- LAG值(前两者的差值),消费者实例ID,消费者连接Broker的主机名以及消费者的CLIENT-ID信息。
六.Kafka Java Consumer API
Consumer端API监控给定消费者组的Lag值
public static Map<TopicPartition,Long> lagof(String groupID,String bootstrapServers) throws TimeoutException {
Properties props=new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
try(AdminClient client=AdminClient.create(props)){
//第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位
ListConsumerGroupOffsetsResult result=client.listConsumerGroupOffsets(groupID);
try{
Map<TopicPartition, OffsetAndMetadata> consumedOffsets=result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//禁止自动提交
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props)){
Map<TopicPartition,Long> endOffsets=consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry->entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())
);
}
}catch (InterruptedException e){
}catch (Exception e){
// 处理ExecutionException
// ...
return Collections.emptyMap();
}catch (TimeoutException e){
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
return Collections.emptyMap();
}
Kafka JMX 监控指标
Lead值:
- 消费者最新消费消息的位移与分区当前第一条消息位移的差值
监控原因:
- kafka的消息是有留存时间设置的,默认是1周,也就是说kafka默认删除1周前的数据。
- 如果消费者程序足够慢,慢到快删除了
1、 消费者从头消费一遍数据;
2、 消费者从最新的消息位移处开始消费,之前没来得及的消息就会被跳过,造成丢消息;
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: