25、RocketMQ进阶:RocketMQ应用-消息消费重试机制

分布式消息队列RocketMQ

四、 RocketMQ应用

4.8)消息消费重试机制

4、 8.1)顺序消息的消费重试;

对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。

消费重试默认间隔时间为1000毫秒,重试期间应用会出现消息消费被阻塞的情 况。

代码实现:

 * * * *// 定义一个push消费者
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
 * * * *// 顺序消息消费失败的消费重试时间间隔,默认为1000毫秒,其取值范围为[10, 30000]毫秒
 * * * *consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功;所以对于顺序消息的消费, 务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

注意,顺序消息没有发送失败重试机制,但具有消费失败重试机制

4、 8.2)无序消息的消费重试;

对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。

不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性;即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。

4、 8.3)消费重试次数与间隔;

4、 8.3.1)重试间隔时间;

对于无序消息集群消费下的重试消费,每条消息默认多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如下表:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

若一条消息在一直消费失败的前提下,将会在正常消费后的第4小时46分后进行第16次重试; 若仍然失败,则将消息投递到死信队列

4、 8.3.2)修改消费重试次数;

代码实现:

 * * * */ 定义一个push消费者
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); 
 * * * *// 修改消费重试次数
 * * * *consumer.setMaxReconsumeTimes(20);

对于修改过的重试次数,将按照以下策略执行:

1、 若修改值小于16,则按照指定间隔进行重试;
2、 若修改值大于16,则超过16次的重试时间间隔均为2小时;

对于Consumer Group,若仅修改了一个Consumer的消费重试次数,则会应用到该Group中所有其它Consumer实例;若出现多个Consumer均做了修改的情况,则采用覆盖方式生效。即最后被修改的值会覆盖前面设置的值。

4、 8.4)重试队列;

对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的;这个特殊的队列就是重试队列。

当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称 ,为%RETRY%consumerGroup@consumerGroup的重试队列,如下图所示:

*

说明:

1、 这个重试队列是针对消息者组的,而不是针对每个Topic设置的(一个Topic的消息可以让多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列);
2、 只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列;
3、 Broker对于重试消息的处理是通过延时消息实现的先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中;

注意:

消费重试的时间间隔与延时消费的延时等级十分相似,除了没有延时等级的前两个时间外,其它的时间都是相同的

4、 8.5)消费重试配置方式;

集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三种方式之一的配置:

1、 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐);
2、 方式2:返回Null;
3、 方式3:抛出异常;

代码实现:

 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *try {
 * * * * * * * * * *// 逐条消费消息
 * * * * * * * * * *for (MessageExt msg : msgs) {
 * * * * * * * * * * * *System.out.println(msg);
 * * * * * * * * *  }
 * * * * * * *  } catch (Throwable e) {
 * * * * * * * * * *// 以下三种方式均可引发消费重试
 * * * * * * * * * *return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 * * * * * * * * * *// return null;
 * * * * * * * * * *// throw new RuntimeException("消费异常");
 * * * * * * *  }
 * * * * * * * *
 * * * * * * * *// 返回消费状态:消费成功
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * *  }

4、 8.6)消费不重试配置方式;

集群消费方式下,消息消费失败后若不希望消费重试,则在捕获到异常后同样也返回与消费成功后的相同的结果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。

代码实现:

 * * * * * *//消费不重试配置方式
 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *try {
 * * * * * * * * * *// 逐条消费消息
 * * * * * * * * * *for (MessageExt msg : msgs) {
 * * * * * * * * * * * *System.out.println(msg);
 * * * * * * * * *  }
 * * * * * * *  } catch (Throwable e) {
 * * * * * * * * * *// 消费不重试配置方式
 * * * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * * * *  }
 * * * * * * * *// 返回消费状态:消费成功
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * *  }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: