14、Kafka实战:Kafka中位移提交

一.Consumer端位移

  • 和消息在分区中的位移不是一回事
  • 是下一条消息的位移,而不是目前最新消费消息的位移

例子:

  • 假设一个分区中有10条消息,位移分别是0到9
  • 其中consumer应用已消费了5条消息,就说明该consumer消费了位移为0到4的5条消息,此时Consumer位移是5,指向了下一条消息的位移。

consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)

1、 Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的;
2、 即Consumer需要为分配给它的每个分区提交各自的位移数据;
3、 提交位移主要是为了表征consumer的消费进度,这样当Consumer发生故障重启之后,能够从kafka中读取之前提交的位移值,然后从相应的位移出继续消费,从而避免整个消费过程重来一遍;
4、 位移提交是kafka提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移X,那么Kafka会认为所有位移值小于X的消息你都已经成功消费了;
5、 位移提交的语义保证是由你来负责的,kafka只会无脑的接收你的提交位移;

二.从用户的角度来说,位移提交分为自动提交和手动提交;

从Consumer端角度来说,位移提交分为同步提交和异步提交;

设置自动提交位移的代码和方法:

Properties props=new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id,"test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","2000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserialiser");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
while(true){
  ConsumerRecords<String,String> records=consumer.poll(100);
  for(ConsumerRecord<String,String> record:records){
System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value())
 }

}
 

props.put("group.id", "test");

props.put("enable.auto.commit", "true");

是开启自动提交代码

手动提交代码

设置enable.auto.commit为false;

同时需要使用commitsync()


while(true){

    ConsumerRecord<String,String> records=consumer.poll(Duration.ofSeconds(1));
    process(records);//处理消息
    try{
          consumer。commitSync();
      
    }catch(CommitFalileException e){
          handle(e);//处理提交失败异常
 
  }

}

调用consumer.commitSync()方法的时机,实在处理完poll()方法返回所有消息之后。

如果过早提交位移,就会出现消费数据丢失的情况。

自动提交位移的问题

  • 一旦设置了enable.auto.commit为true,kafka会保证再开始调用poll方法时,提交上次poll返回的所有消息。
  • 从顺序上来说,poll方法的逻辑是先提交上一批消息的位移再处理下一批消息,他能保证消息不会出现丢失的情况但是自动提交位移的一个问题在于,他会产生重复消费问题。
  • 默认情况下,Consumer每5秒自动提交一次位移。
  • 现在我们假设提交位移之后的3s发生了Rebalance操作。
  • 在Rebalance之后,所有Consumer从上一次提交的位移处继续消费,但该位移已经是3s前的数据
  • 那么在Reblance发生之前的与上一次提交的间隙,就会都重新消费一次。
  • 可以通过减少auto.commit.interval.ms的值来提高提交频率,只能缩小重复消费的时间窗口,但不可能完全消除它,这是自动提交机制的缺陷。

手动提交位移

优点:

  • 更加灵活,完全能够把控位移提交时机和频率

缺陷:

  • 调用commitSync()时,consumer程序会处于阻塞状态,直到远端的Broker返回提交结果。
  • 影响整个应用程序的TPS

解决方案:

  • Kafka社区为手动提交位移提供了另一个API方法:Kafkacosnumer#commitAsync();
  • 异步,不影响TPS
while(true){

 ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
  process(records);//处理消息
  consumer.commitAsync(
     (offsets,exection)->{

     if(exception!=null){
      handle(exception);
}

}
);

commitAsync是否能替代commitSync?

不能!

  • 出现问题时候他不会自动重试,
  • 因为是异步操作,如果提交失败后在重试时提交的位移值早已经是过期或者不是最新值了。

解决方案:

将commitSync和commitAsync组合使用

  • 利用commitsync的自动重试来规避瞬时错误,比如网络的抖动,Broker端的GC等
  • 这些错误是短暂的,自动重试通常都会成功,
  • 不希望程序处于阻塞状态,影响TPS

try{

   while(true){
  

     ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
     process(records);//处理消息
     commitAysnc();//异步提交规避阻塞

   }
}catche(Exection e){
      handle(e);//处理异常
}finally{

       try{

         consumer.commitSync();//最后一次提交使用同步阻塞式提交
}finally{

  consumer.colse();

}

}

这段代码同时使用了commitSync()和commitAsync()。

  • 对于常规性,阶段性的手动提交,我们调用commitAsync()避免程序阻塞
  • 在Consumer要关闭前,我们要调用commitsync()方法执行同步阻塞式的位移提交,以确保comsuner关闭前能够保存正确的位移数据
  • 两者结合,实现了异步无阻塞式的位移管理,确保Consumer位移的正确性。

分步提交

  • commitSync(Map)
  • commitAsync(Map)
  • 参数是一个Map对象,键是TopicPartion,消费分区,值是OffsetAndMetadata,保存的是位移数据
priavte Map<TopicPartition,OffsetAndMetadata> offsets=new HashMap<>();
int count=0;
……
while(true){

 ConsumerRecords<String,String> records=consumer.poll(Duration.ofSeconds(1));
 for(ConsumerRecord<String,String> record:records){
 
  process(record);//处理消息
  offsets.put(new TopicPartition(record.topic(),record.partition()),
         new OffsetAndMetadata(record.offset() + 1)))
  
  if(count%100==0){

   consumer.commitAsync(offsets,null);回调处理逻辑是null;
  }
    count++;
}

}
  • 程序先创建了一个Map对象,用于保存consumer消费处理过程中要提交的分区位移
  • 开始逐条处理消息,并构造要提交的位移值
  • 我们设置计数器,当每100条就统一的提交一次位移
  • 与调用无参的commitAsync不同,这里调用带Map对象参数的CommitAsync进行细粒度的位移提交
  • 代码可以每处理100条消息就提交一次位移,不用受poll方法返回的消息总数限制。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: