入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start
RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费,其中消息的拉取由 PullMessageService 来实现
PullMessageService
上节看到消息队列负载均衡之后,会new PullRequest(), 然后交给PullMessageService进行消息拉取,这里即是从阻塞队列中取出PullRequest,进行消息拉取操作
PullRequest的属性如下
DefaultMQPushConsumerImpl#pullMessage
首先进行消息流控,如果本地已经拉取的消息超过阈值条数或者阈值大小,进行延迟拉取
构造PULL_MESSAGE命令,异步调用Broker接口进行消息拉取,在回调函数中进行消息的解析存入processQueue
回调函数PullCallback,进行消费端过滤、添加ProcessQueue、异步提交消费申请、发起下次拉取任务
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: