12、RocketMQ源码分析:Consumer-消息消费

目录

ConsumeMessageConcurrentlyService

ConsumeRequest

processConsumeResult

ConsumeMessageOrderlyService


在上一节,消息拉取后将消息放入ProcessQueue,并异步发起消费申请,如下代码

*

消费请求由 consumeMessageService 处理,先来看实现类是什么

在DefaultMQPushConsumerImpl#start,消费者启动时,会根据我们注册的消息监听来进行初始化,如下

*

*

分为了顺序消费和非顺序两种实现,挨个看源码

ConsumeMessageConcurrentlyService

并发消费

*参数List 每次消费多少条消息,由参数ConsumeMessageBatchMaxSize控制

*

ConsumeRequest

ConsumeMessageConcurrentlyService.ConsumeRequest#run

消费前调用钩子函数进行消费前处理,然后调用MessageListener进行业务处理

*最后根据消息消费返回值,进行不同的处理

*

processConsumeResult

返回CONSUME_SUCCESS表示成功消费,更新消费进度

返回RECONSUME_LATER表示消费失败,需要重新发送一份消息到Broker进行延时消费

*

ConsumeMessageOrderlyService

顺序消费

首先消息消费时对队列的负载均衡逻辑,确保了每个Topic的每个消费队列只被一个Consumer订阅和消费(集群模式),但是顺序消费同时必须保证同一时间只有一个线程来消费这个队列,所以ConsumeMessageOrderlyService与ConsumeMessageConcurrentlyService一个最大的区别就是在消费队列时,先加锁来保证当前JVM中同一时间只有一个线程来进行消息消费。

提交消费申请代码如下**,**提交消费请求时不像并发消费,没有将消息并发提交到线程池消费,而是以队列为单位提交任务执行

*

ConsumeMessageOrderlyService.ConsumeRequest#run

*

如上,MessageQueueLock为每个队列new了一个Object,消费时先获取消息队列对应的Object,对其加锁后再进行消息消费

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: