15、Kafka实战:CommitFailedException异常

一.CommitFailedException

CommitFailedException:Consumer客户端在提交位移时出现了错误或异常,而且还是不可恢复的严重异常

二.产生CommitFailedException的原因:

  • 本次提交位移失败,原因是消费者开启Rebalance过程,将要提交的位移分配给另一个消费者实例。
  • 产生原因:消费者实例连续两次调用poll方法的时间间隔超过了期望的max.poll.interval.ms参数
  • 消费者实例花费太长时间进行消息处理,耽误了调用poll方法。

解决方案:

  • 增加期望的时间间隔max.poll.interval.ms参数值
  • 减少poll方法一次性返回的消息数量,即减少max.poll.records参数值

CommitFailException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法时。

场景一:

当消息处理的总时间超过预设的max.poll.interval.ms参数时,Kafka Consumer端会抛出CommitFailedException异常。

…
Properties props=new Properties();
…
props.put("max.poll.interval.ms",5000);
consumer.subscribe(Arrays.asList("test-topic"));
while(true){

    ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
   //使用Thread.sleep模拟消息处理耗时
   Thread.sleep(6000L);
   consumer.commitSync();
}

解决方案:

  • 缩短单条消息处理的时间,
  • 增加Consumer端允许下游系统消费一批消息的最大市场
  • 减少下游系统一次性消费的消息总数
  • 下游系统使用多线程来加速消费

场景二:

  • 消费者组和独立消费者在使用之前指定group.id
  • 应用中同时出现了设置相同的group.id值消费组程序和独立消费者程序
  • 当独立消费者程序手动提交位移的时,kafka就会立即抛出CommitFailException异常

产生原因:

kafka无法识别这个具有相同group.id的消费者实例,于是抛出异常

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: