20、RocketMQ进阶:RocketMQ应用-延时消息

分布式消息队列RocketMQ

四、 RocketMQ应用

4.3)延时消息

4、 3.1)什么是延时消息;

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。

在电商平台中,订单创建时会发送一条延迟消息;这条消息将会在30分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单将商品再次放回到库存;如果完成支付则忽略。

在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成则取消预订,将车票再次放回到票池;如果完成支付则忽略。

4、 3.2)延时等级;

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ服务端的MessageStoreConfig类中的如下变量中:

*

即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1 天这个等级1d),配置文件在RocketMQ安装目录下的conf目录中【单机的就添加到 broker.conf 中】

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 
1h 2h 1d

4、 3.3)延时消息实现原理;

*

具体实现流程如下:

4、 3.3.1)修改消息;

*

Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。

不过,在分发之前,系统会先判断消息中是否带有延时等级,若没有则直接正常分发;若有则需要经历一个复杂的过程:

1、 修改消息的Topic为SCHEDULE_TOPIC_XXXX;
2、 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话);

延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1

需要注意,在创建queueId目录时并不是一次性地将所有延迟等级对应的目录全部创建完毕, 而是用到哪个延迟等级创建哪个目录

3、 修改消息索引单元内容;索引单元中的MessageTagHashCode部分原本存放的是消息的Tag的Hash值,现修改为消息的投递时间;

投递时间是指该消息被重新修改为原Topic后再次被写入到 commitlog中的时间;消息存储时间指的是消息被发送到Broker时的时间戳。

投递时间 = 消息存储时间 + 延时等级时间


*

4、 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中;

SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的?

是按照消息投递时间排序的;一个Broker中同一等级的所有延时消息会被写入到consumequeue 目录的SCHEDULE_TOPIC_XXXX目录下相同Queue中。

即一个Queue中消息投递时间的延迟等级时间是相同的,那么投递时间就取决于于消息存储时间了;即按照消息被发送到Broker的时间进行排序的。

4、 3.3.2)投递延时消息;

Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。

不过,在投递之前会从commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。

ScheuleMessageService在Broker启动时,会创建并启动一个定时器TImer,用于执行相应的定时任务。

系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。

每个TimerTask都会检测相应Queue队列的第一条消息是否到期;若第 一条消息未到期则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消息到期了,则将该消息投递到目标Topic,即消费该消息。

4、 3.3.3)将消息重新写入commitlog;

延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue

这其实就是一次普通消息发送,只不过这次的消息Producer是延迟消息服务类 ScheuleMessageService。

4、 3.4)代码实现案例;

4、 3.4.1)定义延迟消息生产类;

// 定义延迟消息生产类
public class DelayProducer {
 *  public static void main(String[] args) throws Exception {
 * * *  DefaultMQProducer producer = new DefaultMQProducer("pg");
 * * *  producer.setNamesrvAddr("rocketmqOS:9876");
 * * *  // 开启生产者
 * * *  producer.start();
 * * *  // 生产并发送1条消息
 * * *  for (int i = 0; i < 1; i++) {
 * * * * *  byte[] body = ("Hi," + i).getBytes();
 * * * * *  Message msg = new Message("TopicB", "someTag", body);
 * * * * *  // 指定消息延迟等级为3级,即延迟10s
 * * * * *  msg.setDelayTimeLevel(3);
 * * * * *  SendResult sendResult = producer.send(msg);
 * * * * *  // 输出消息被发送的时间
 * * * * *  System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
 * * * * *  System.out.println(" ," + sendResult);
 * * *  }
 * * *  producer.shutdown();
 *  }
}

输出:

*

4、 3.4.2)定义延迟消息消费类;

// 定义延迟消息消费类
public class OtherConsumer {
 * *public static void main(String[] args) throws MQClientException {
 * * * *DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
 * * * *consumer.setNamesrvAddr("rocketmqOS:9876");
​
 * * * *consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 * * * *consumer.subscribe("TopicB", "*");
​
 * * * *consumer.registerMessageListener(new MessageListenerConcurrently() {
 * * * * * *@Override
 * * * * * *public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 * * * * * * * *for (MessageExt msg : msgs) {
 * * * * * * * * * *// 输出消息被消费的时间
 * * * * * * * * * *System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
 * * * * * * * * * *System.out.println(" ," + msg);
 * * * * * * *  }
 * * * * * * * *return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 * * * * *  }
 * * *  });
 * * * *consumer.start();
 * * * *System.out.println("Consumer Started");
 *  }
}

输出:【发现收到消息正是在生成发送消息的10秒后,实现了延时消息】

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: