目录
源码实现
CommitLog#asyncPutMessage
ScheduleMessageService
load()
start()
DeliverDelayedMessageTimerTask
当消息写入到Broker后,需要等待指定的时长后才可被消费处理的消息,称为延时消息,本文记录延迟消息在RocketMQ的实现
在生产者端发送延时消息的代码如下,只需要为消息设置属性DelayTimeLevel,参数为延时级别,即可实现消息延时
message.setDelayTimeLevel(3);
RocketMQ不支持自定义延时时间,系统定义了18个延时级别如下
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel
参数3表示该条消息延时10s,参数4表示延时30s,依次类推
源码实现
1、 生产者为消息设置延时标记;
2、 Broker判断消息如果需要延时,则将该条消息暂存到名为SCHEDULE_TOPIC_XXXX的Topic;
3、 Broker端ScheduleMessageService进行延时调度,当消息延时时间到期后,重新将消息发送到原Topic的消费队列,供消费者消费;
RocketMQ规定了延时消息的Topic为SCHEDULE_TOPIC_XXXX,其消息队列也根据延时级别分为18个消息队列,文件存储如下,只有三个文件夹是因为暂时只用到了3个隔离级别,根据下图也能发现延时级别和queueid的关系(queueid = 延时级别 - 1)
CommitLog#asyncPutMessage
将消息写入CommitLog时,判断是否需要延时,然后替换Topic和queueid
再进行消息分发,将消息写入ConsumeQueue时,也有一点特殊处理:
CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
延时消息存入SCHEDULE_TOPIC_XXXX的消费队列时,tagsCode存储的是消息的到期时间
ScheduleMessageService
进行延时任务调度,到期后将消息重新写入CommitLog
load()
加载SCHEDULE_TOPIC_XXXX的所有队列的消费offset情况和延迟级别信息到内存Map
start()
启动各个队列的消息分发定时任务
DeliverDelayedMessageTimerTask
每个队列的消息分发任务
executeOnTimeup
1、 根据延迟级别获取队列对象;
2、 根据offset获取队列中未消费的所有消息,循环获取tagsCode(存储的是消息到期时间)判断是否需要进行消息分发;
3、 如果消息到期就恢复原消息的所有信息重新写入CommitLog,如果未到期则延时一段时间再次执行当前方法executeOnTimeup;
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: