13、RocketMQ源码分析:延迟消息

目录

源码实现

CommitLog#asyncPutMessage

ScheduleMessageService

load()

start()

DeliverDelayedMessageTimerTask


当消息写入到Broker后,需要等待指定的时长后才可被消费处理的消息,称为延时消息,本文记录延迟消息在RocketMQ的实现

在生产者端发送延时消息的代码如下,只需要为消息设置属性DelayTimeLevel,参数为延时级别,即可实现消息延时

message.setDelayTimeLevel(3);

RocketMQ不支持自定义延时时间,系统定义了18个延时级别如下

org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

*

参数3表示该条消息延时10s,参数4表示延时30s,依次类推

源码实现

1、 生产者为消息设置延时标记;
2、 Broker判断消息如果需要延时,则将该条消息暂存到名为SCHEDULE_TOPIC_XXXX的Topic;
3、 Broker端ScheduleMessageService进行延时调度,当消息延时时间到期后,重新将消息发送到原Topic的消费队列,供消费者消费;

RocketMQ规定了延时消息的Topic为SCHEDULE_TOPIC_XXXX,其消息队列也根据延时级别分为18个消息队列,文件存储如下,只有三个文件夹是因为暂时只用到了3个隔离级别,根据下图也能发现延时级别和queueid的关系(queueid = 延时级别 - 1)

*

CommitLog#asyncPutMessage

将消息写入CommitLog时,判断是否需要延时,然后替换Topic和queueid

*

再进行消息分发,将消息写入ConsumeQueue时,也有一点特殊处理:

CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

延时消息存入SCHEDULE_TOPIC_XXXX的消费队列时,tagsCode存储的是消息的到期时间

*

ScheduleMessageService

进行延时任务调度,到期后将消息重新写入CommitLog

load()

加载SCHEDULE_TOPIC_XXXX的所有队列的消费offset情况和延迟级别信息到内存Map

*

start()

启动各个队列的消息分发定时任务

*

DeliverDelayedMessageTimerTask

每个队列的消息分发任务

*

executeOnTimeup

1、 根据延迟级别获取队列对象;
2、 根据offset获取队列中未消费的所有消息,循环获取tagsCode(存储的是消息到期时间)判断是否需要进行消息分发;
3、 如果消息到期就恢复原消息的所有信息重新写入CommitLog,如果未到期则延时一段时间再次执行当前方法executeOnTimeup

*

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: