一.CommitFailedException
CommitFailedException:Consumer客户端在提交位移时出现了错误或异常,而且还是不可恢复的严重异常。
二.产生CommitFailedException的原因:
- 本次提交位移失败,原因是消费者开启Rebalance过程,将要提交的位移分配给另一个消费者实例。
- 产生原因:消费者实例连续两次调用poll方法的时间间隔超过了期望的max.poll.interval.ms参数
- 消费者实例花费太长时间进行消息处理,耽误了调用poll方法。
解决方案:
- 增加期望的时间间隔max.poll.interval.ms参数值
- 减少poll方法一次性返回的消息数量,即减少max.poll.records参数值
CommitFailException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法时。
场景一:
当消息处理的总时间超过预设的max.poll.interval.ms参数时,Kafka Consumer端会抛出CommitFailedException异常。
…
Properties props=new Properties();
…
props.put("max.poll.interval.ms",5000);
consumer.subscribe(Arrays.asList("test-topic"));
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
//使用Thread.sleep模拟消息处理耗时
Thread.sleep(6000L);
consumer.commitSync();
}
解决方案:
- 缩短单条消息处理的时间,
- 增加Consumer端允许下游系统消费一批消息的最大市场
- 减少下游系统一次性消费的消息总数
- 下游系统使用多线程来加速消费
场景二:
- 消费者组和独立消费者在使用之前指定group.id
- 应用中同时出现了设置相同的group.id值消费组程序和独立消费者程序
- 当独立消费者程序手动提交位移的时,kafka就会立即抛出CommitFailException异常
产生原因:
kafka无法识别这个具有相同group.id的消费者实例,于是抛出异常
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: