24、RocketMQ进阶:RocketMQ应用-消息发送重试机制

分布式消息队列RocketMQ

四、 RocketMQ应用

4.7)消息发送重试机制

4、 7.1)说明;

生产者【Producer】对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。

对于消息重投,需要注意以下几点:

  • 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的
  • 只有普通消息具有发送重试机制,顺序消息是没有的
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复
  • 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件
  • producer主动重发、consumer负载变化(发生Rebalance,不会导致消息重复,但可能出现重复消费)也会导致重复消息
  • 消息重复在 RocketMQ中是无法避免的问题 但要避免消息的重复消费。
  • 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息key),使消费者对消息进行消费判断来避免重复消费

消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略

4、 7.2)同步发送失败策略;

对于普通消息,消息发送默认采用轮询【round-robin】策略来选择所发送到的队列。

如果发送失败,默认重试2次,但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker;若只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。

代码实现:

 * * * *// 创建一个producer,参数为Producer Group名称
 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * * *// 指定nameServer地址
 * * * *producer.setNamesrvAddr("rocketmqOS:9876");
 * * * *// 设置同步发送失败时重试发送的次数,默认为2次
 * * * *producer.setRetryTimesWhenSendFailed(3);
 * * * *// 设置发送超时时限为5s,默认3s
 * * * *producer.setSendMsgTimeout(5000);

同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标 Broker;其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。

如果超过重试次数,则抛出异常,由Producer去保证消息不丢;当生产者出现 RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。

思考:让我们自己实现失败隔离功能,如何来做?

方案一:Producer中维护某JUC的Map集合,其key是发生失败的时间戳,value为Broker实例;Producer中还维护着一个Set集合,其中存放着所有未发生发送异常的Broker实例;选择目标Broker是从该Set集合中选择的,再定义一个定时任务,定期从Map集合中将长期未发生发送异常的Broker清理出去,并添加到Set集合;

方案二:为Producer中的Broker实例添加一个标识,例如是一个AtomicBoolean属性。只要该Broker上发生过发送异常,就将其置为true;选择目标Broker就是选择该属性值为false的Broker,再定义一个定时任务,定期将Broker的该属性置为false;

方案三:为Producer中的Broker实例添加一个标识,例如是一个AtomicLong属性。只要该 Broker上发生过发送异常,就使其值自增;选择目标Broker就是选择该属性值最小的Broker,若该值相同,采用轮询方式选择。

4、 7.3)异步发送失败策略;

异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢。

代码实现:

 * * * *DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * * *producer.setNamesrvAddr("rocketmqOS:9876");
 * * * *// 指定异步发送失败后不进行重试发送
 * * * *producer.setRetryTimesWhenSendAsyncFailed(0);

4、 7.4)消息刷盘失败策略;

消息刷盘超时(Master或Slave)或slave不可用(slave在做数据同步时向master返回状态不是 SEND_OK)时,默认是不会将消息尝试发送到其他Broker的。

不过,对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: