11、RocketMQ源码分析:Consumer-消息拉取

入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start

RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费,其中消息的拉取由 PullMessageService 来实现

*

PullMessageService

上节看到消息队列负载均衡之后,会new PullRequest(), 然后交给PullMessageService进行消息拉取,这里即是从阻塞队列中取出PullRequest,进行消息拉取操作

*

* PullRequest的属性如下

*

DefaultMQPushConsumerImpl#pullMessage

首先进行消息流控,如果本地已经拉取的消息超过阈值条数或者阈值大小,进行延迟拉取

*

构造PULL_MESSAGE命令,异步调用Broker接口进行消息拉取,在回调函数中进行消息的解析存入processQueue

*

回调函数PullCallback,进行消费端过滤、添加ProcessQueue、异步提交消费申请、发起下次拉取任务

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: