分布式消息队列RocketMQ
四、 RocketMQ应用
4.7)消息发送重试机制
4、 7.1)说明;
生产者【Producer】对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的
- 只有普通消息具有发送重试机制,顺序消息是没有的
- 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复
- 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件
- producer主动重发、consumer负载变化(发生Rebalance,不会导致消息重复,但可能出现重复消费)也会导致重复消息
- 消息重复在 RocketMQ中是无法避免的问题 但要避免消息的重复消费。
- 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息key),使消费者对消息进行消费判断来避免重复消费
消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
4、 7.2)同步发送失败策略;
对于普通消息,消息发送默认采用轮询【round-robin】策略来选择所发送到的队列。
如果发送失败,默认重试2次,但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker;若只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。
代码实现:
* * * *// 创建一个producer,参数为Producer Group名称
* * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
* * * *// 指定nameServer地址
* * * *producer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 设置同步发送失败时重试发送的次数,默认为2次
* * * *producer.setRetryTimesWhenSendFailed(3);
* * * *// 设置发送超时时限为5s,默认3s
* * * *producer.setSendMsgTimeout(5000);
同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标 Broker;其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。
如果超过重试次数,则抛出异常,由Producer去保证消息不丢;当生产者出现 RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。
思考:让我们自己实现失败隔离功能,如何来做?
方案一:Producer中维护某JUC的Map集合,其key是发生失败的时间戳,value为Broker实例;Producer中还维护着一个Set集合,其中存放着所有未发生发送异常的Broker实例;选择目标Broker是从该Set集合中选择的,再定义一个定时任务,定期从Map集合中将长期未发生发送异常的Broker清理出去,并添加到Set集合;
方案二:为Producer中的Broker实例添加一个标识,例如是一个AtomicBoolean属性。只要该Broker上发生过发送异常,就将其置为true;选择目标Broker就是选择该属性值为false的Broker,再定义一个定时任务,定期将Broker的该属性置为false;
方案三:为Producer中的Broker实例添加一个标识,例如是一个AtomicLong属性。只要该 Broker上发生过发送异常,就使其值自增;选择目标Broker就是选择该属性值最小的Broker,若该值相同,采用轮询方式选择。
4、 7.3)异步发送失败策略;
异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢。
代码实现:
* * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
* * * *producer.setNamesrvAddr("rocketmqOS:9876");
* * * *// 指定异步发送失败后不进行重试发送
* * * *producer.setRetryTimesWhenSendAsyncFailed(0);
4、 7.4)消息刷盘失败策略;
消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。
不过,对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: