09、RocketMQ源码分析:Consumer-消息队列负载均衡

目录

RebalanceService

RebalanceImpl

rebalanceByTopic

AllocateMessageQueueStrategy

updateProcessQueueTableInRebalance


入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start

RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费

*

RebalanceService

负载均衡定时任务的实现类

*

org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance

*

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance

*

RebalanceImpl

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

*

rebalanceByTopic

广播模式所有队列都需订阅, 不用负载均衡,下边只关注集群模式

1、 获取所有消息队列和消费者实例;

2、 使用负载均衡算法进行分配;

3、 根据分配后最新的队列信息,进行消息拉取或停止原队列消费;

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
              。。。。
            case CLUSTERING: {
                // 从缓存表获取该Topic所有的消息队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

                // 从Broker获取该消费者组所有消费者列表
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    // 负载均衡策略处理
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    // 根据最新的队列信息,更新本地
                    // 如果有队列被移除,停止消费原队列
                    // 如果有队列新增,新增拉取消息的任务
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

AllocateMessageQueueStrategy

分配队列的策略接口,实现类有如下,假设有8个消费队列,3个消费者ABC

AllocateMessageQueueAveragely 默认使用的,分配后A123, B456, C67

AllocateMessageQueueAveragelyByCircle 分配后A147 B258 C36

AllocateMessageQueueConsistentHash 环形一致性hash算法,下节

AllocateMessageQueueByConfig 自定义配置,为每个消费者自定义要订阅的队列

AllocateMessageQueueByMachineRoom 也是自定义配置,每个消费者只负载自定义配置的机房中Broker的队列信息,BrokerName也需要规范命名

AllocateMessageQueueAveragely#allocate 代码如下

*

updateProcessQueueTableInRebalance

如果有删除的队列,停止消费消息,移除消息队列

如果有新增的队列,新增消息拉取任务

*

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: