一. kafka 什么情况下保证消息不丢失:
kafka对"已提交"的消息(commit message)做有限度的持久化保证
第一个核心要素是----已提交的消息
- 什么是已提交的消息:kafka的若干个Broker成功的接收到了一条消息并且写入到日志文件后 ,他会告诉生产者程序这条消息已成功提交。
- 如何定义这个"若干个": 需要你的定义 一个或者全部
第二个核心要素-----有限度的持久化保证
kafka不可能保证任何情况下都做到不丢失消息。
总结:kafka能做到不丢失消息,不过这些消息必须是已提交,并且满足一定条件。 ####
二. “消息丢失”案例 ##
生产者程序丢失数据 ##
1、 kafkaProducer是异步发送消息;
2、 fireandforget发射后不管:属于导弹制道领域,后被借鉴到计算机领域中;
3、 执行完一个操作后不管它的结果是否成功;
产生原因:
- 网络抖动
- 消息太大
- 等异常不会被提示出来
解决方案: ####
- Producer永远使用带有回调通知的发送API
- 不要使用Producer.send(msg),而是producer.send(msg,callback)
消费者程序丢失数据
1、 Consumer端丢失数据主要体现在Consumer要消费的消息不见;
2、 Consumer有"位移"的概念:这个Consumer当前消费到Topic分区的位置;
类似于书签:
- 正确使用书签:读书, 更新书签页 --消息的重复处理,(一页书被重复读)
- 如果颠倒了:更新书签页,读书
- 就可以因为 未读完而进入下一页就是产生消息丢失。
多线程导致消费者消息丢失--异步执行失败,但是已经提交
- Consumer程序从kafka获取到消息后开启多个线程异步处理消息
- 而Consumer程序自动的向前更新位移。加入某个线程运行失败了,负责的消息没有成功处理,但是位移已经被更新了,因此这条消息就丢失了
解决方案:
- 不开启自动提交位移,改成手动提交位移
- 多线程处理条件极易出现消费多次的情况
三.kafka无消息丢失的配置:
1、 不要使用producer.send(msg),要使用producer.send(msg,callback),带回调的send方法;
2、 设置acks=all;
- acks是Prodcuer的一个参数,代表对"已提交"消息的定义。
- 如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是 "已提交"。这个是最高等级的"已提交"定义。
3、 设置retries为一个较大的值同样是Producer参数;
- 对应前面提到的Producer自动重试。
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries>0时,Producer能自动重试消息发送,避免消息丢失。
4、 设置unclean.leader.election.enable=false;
- 不允许落户Leader太多的Broker竞选Leader。
5、 设置replication.factor>=3,这也是Broker端参数.;
- 被防止消息丢失的主要机制就是冗余。
6、 设置min.insync.replicas>1,Broker端参数;
- 控制消息至少要被写入到多少副本才算是"已提交"。设置成大于1可以提升消息持久性,在实际生产环境中千万别使用默认值1。
7、 确认消息消费完成后再提交;
- Consumer端有个参数enable.auto.commit,最好设置成false,采用手动提交位移
- 对Consumer多线程处理的场景而言是至关重要的。
8、 确保replication.factor>min.insync.replicas如果两者相等,;
- 那么只要有一个副本挂机,整个分区就无法正常工作了。
- 我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。
- 推荐设置成 replication.factor = min.insync.replicas + 1。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: