04、RocketMQ源码分析:NameServer

文章目录

  • 版本声明
  • NameServer介绍
  • NameServer源码分析
    • KVConfigManager
  • RouteInfoManager
  • BrokerHouseKeepingService
  • DefaultRequestProcessor
  • NamesrvStartup

版本声明

1、 基于rocketmq-all-4.3.1版本;
2、 如有发现分析不正确的地方欢迎指正,谢谢!;

NameServer介绍

1、 NameServer本身的高可用是通过部署多台NameServer服务NameServer互相独立,彼此之间不会通信(即多台NameServer的数据并不是强一致的),任意一台宕机并不会影响其他的NameServer
2、 作用;

  • 维护活跃的Broker地址列表,包括Master和Slave

  • 维护所有TopicTopic对应队列的地址列表

  • 维护所有BrokerFilter列表
    3、 BrokerNameServer关系;

  • 单个Broker与所有NameServer保持长连接

  • Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic信息

  • NameServer每隔10秒钟扫描所有存活的Broker连接,若2min内没有发送心跳信息,则断开连接

  • Broker在启动后向所有NameServer注册,Producer在发送消息之前先从NameServer获取Broker服务器的地址列表,然后根据负载均衡算法从列表中选择一台Broker进行消息发送
    4、 稳定性;

  • nameserver互相独立,无状态

  • nameserver不会有频繁的读写,稳定性相对高

NameServer源码分析

KVConfigManager

1、 内存级的KV存储,提供增删改查以及持久化数据的能力本质就是一个HashMap;

   
    public class KVConfigManager {
     
       
     
        private final NamesrvController namesrvController;
    
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        // 
        private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
            new HashMap<String, HashMap<String, String>>();
    
        public KVConfigManager(NamesrvController namesrvController) {
     
       
            this.namesrvController = namesrvController;
        }
    
        public void load() {
     
       
            String content = null;
            try {
     
       
                content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
            } catch (IOException e) {
     
       
                log.warn("Load KV config table exception", e);
            }
            if (content != null) {
     
       
                KVConfigSerializeWrapper kvConfigSerializeWrapper =
                    KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
                if (null != kvConfigSerializeWrapper) {
     
       
                    this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                    log.info("load KV config table OK");
                }
            }
        }  
    }  

RouteInfoManager

1、 路由信息即Broker向NameServer注册后保存的信息,RouteInfoManager保存所有的TopicBroker信息;

    public class RouteInfoManager {
     
       
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        //topic列表对应的队列信息
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        //Broker地址信息
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        //broker集群信息
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        //Broker当前存活的Broker(非实时)
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        //Broker过滤信息
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
        public RouteInfoManager() {
     
       
            this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
            this.brokerAddrTable = new HashMap<String, BrokerData>(128);
            this.clusterAddrTable = new HashMap<String, Set<String>>(32);
            this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
            this.filterServerTable = new HashMap<String, List<String>>(256);
        }
   ...省略... 
   } 

*
2、 成员变量解析;

  • topicQueueTable:Topic消息队列路由信息,包括topic所在的broker名称,读队列数量,写队列数量,同步标记等信息,rocketmq根据topicQueueTable的信息进行负载均衡消息发送。

  • brokerAddrTable:Broker节点信息,包括brokerName,所在集群名称,还有主备节点信息。

  • clusterAddrTable:Broker集群信息,存储了集群中所有的BrokerName。

  • brokerLiveTable:Broker状态信息,Nameserver每次收到Broker的心跳包就会更新该信息。
    3、 通过远程调试查看具体内容(双主双从,两个nameserver);

  • ip地址列表

    • rocketmq-slave2 172.19.0.7
    • rocketmq-slave1 172.19.0.6
    • rocketmq-master2 172.19.0.5
    • rocketmq-master1 172.19.0.4
    • rocketmq-nameserver2 172.19.0.3
    • rocketmq-nameserver1 172.19.0.2
  • topicQueueTable内容如下
    *

    ```java 
    topicQueueTable信息
    
    {
    
    
        "RMQ_SYS_TRANS_HALF_TOPIC": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            }
        ],
        "rocketmq-master1": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 7,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            }
        ],
        "rocketmq-master2": [
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 7,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            }
        ],
        "SELF_TEST_TOPIC": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            }
        ],
        "TBW102": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 7,
                "readQueueNums": 4,
                "topicSynFlag": 0,
                "writeQueueNums": 4
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 7,
                "readQueueNums": 4,
                "topicSynFlag": 0,
                "writeQueueNums": 4
            }
        ],
        "testTopic": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 6,
                "readQueueNums": 16,
                "topicSynFlag": 0,
                "writeQueueNums": 16
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 6,
                "readQueueNums": 16,
                "topicSynFlag": 0,
                "writeQueueNums": 16
            }
        ],
        "BenchmarkTest": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 6,
                "readQueueNums": 1024,
                "topicSynFlag": 0,
                "writeQueueNums": 1024
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 6,
                "readQueueNums": 1024,
                "topicSynFlag": 0,
                "writeQueueNums": 1024
            }
        ],
        "DefaultCluster": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 7,
                "readQueueNums": 16,
                "topicSynFlag": 0,
                "writeQueueNums": 16
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 7,
                "readQueueNums": 16,
                "topicSynFlag": 0,
                "writeQueueNums": 16
            }
        ],
        "OFFSET_MOVED_EVENT": [
            {
    
    
                "brokerName": "rocketmq-master1",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            },
            {
    
    
                "brokerName": "rocketmq-master2",
                "perm": 6,
                "readQueueNums": 1,
                "topicSynFlag": 0,
                "writeQueueNums": 1
            }
        ]
    }
    
 *  `BrokerAddrTable`内容如下  
    ![&nbsp;][nbsp 2]
    
        ```java 
        brokerAddrTable信息
        
        {
               
                 
            "rocketmq-master1": {
               
                 
                "brokerAddrs": {
               
                 
                    0: "172.19.0.4:10911",
                    1: "172.19.0.6:10921"
                },
                "brokerName": "rocketmq-master1",
                "cluster": "DefaultCluster"
            },
            "rocketmq-master2": {
               
                 
                "brokerAddrs": {
               
                 
                    0: "172.19.0.5:10912",
                    1: "172.19.0.7:10922"
                },
                "brokerName": "rocketmq-master2",
                "cluster": "DefaultCluster"
            }
        }
        
  • clusterAddrTable内容如下
    *

    ```java 
    clusterAddrTable 信息
    
    {
    
    
        "DefaultCluster": [
            "rocketmq-master1",
            "rocketmq-master2"
        ]
    }
    
 *  `brokerLiveTable`内容如下  
    ![&nbsp;][nbsp 4]
    
        ```java 
        brokerLiveTable信息
        {
               
                 
            "172.19.0.7:10922": {
               
                 
                "channel": {
               
                 
                    "active": true,
                    "inputShutdown": false,
                    "open": true,
                    "outputShutdown": false,
                    "registered": true,
                    "writable": true
                },
                "dataVersion": {
               
                 
                    "counter": 3,
                    "timestamp": 1562476312530
                },
                "haServerAddr": "172.19.0.7:10923",
                "lastUpdateTimestamp": 1562476361146
            },
            "172.19.0.5:10912": {
               
                 
                "channel": {
               
                 
                    "active": true,
                    "inputShutdown": false,
                    "open": true,
                    "outputShutdown": false,
                    "registered": true,
                    "writable": true
                },
                "dataVersion": {
               
                 
                    "counter": 3,
                    "timestamp": 1562476312530
                },
                "haServerAddr": "172.19.0.5:10913",
                "lastUpdateTimestamp": 1562476360402
            },
            "172.19.0.4:10911": {
               
                 
                "channel": {
               
                 
                    "active": true,
                    "inputShutdown": false,
                    "open": true,
                    "outputShutdown": false,
                    "registered": true,
                    "writable": true
                },
                "dataVersion": {
               
                 
                    "counter": 3,
                    "timestamp": 1562476312525
                },
                "haServerAddr": "172.19.0.4:10912",
                "lastUpdateTimestamp": 1562476359516
            },
            "172.19.0.6:10921": {
               
                 
                "channel": {
               
                 
                    "active": true,
                    "inputShutdown": false,
                    "open": true,
                    "outputShutdown": false,
                    "registered": true,
                    "writable": true
                },
                "dataVersion": {
               
                 
                    "counter": 3,
                    "timestamp": 1562476312525
                },
                "haServerAddr": "172.19.0.6:10922",
                "lastUpdateTimestamp": 1562476360541
            }
        }
        

BrokerHouseKeepingService

1、 BrokerHouseKeepingService:实现了ChannelEventListener接口,用于处理Broker状态事件,当Broker失效、异常或者关闭,则将BrokerRouteInfoManager中移除;

    public class BrokerHousekeepingService implements ChannelEventListener {
     
       
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        private final NamesrvController namesrvController;
    
        public BrokerHousekeepingService(NamesrvController namesrvController) {
     
       
            this.namesrvController = namesrvController;
        }
    
        @Override
        public void onChannelConnect(String remoteAddr, Channel channel) {
     
       
        }
    
        @Override
        public void onChannelClose(String remoteAddr, Channel channel) {
     
       
            //通道关闭从RouteInfoManager中移除Broker
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelException(String remoteAddr, Channel channel) {
     
       
            //通道发生异常从RouteInfoManager中移除Broker
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    
        @Override
        public void onChannelIdle(String remoteAddr, Channel channel) {
     
       
            //通道失效从RouteInfoManager中移除Broker
            this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
        }
    }
    

DefaultRequestProcessor

1、 DefaultRequestProcessor默认请求处理器,根据RequestCode执行相应的处理,核心处理方法processRequest()

  
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
     
       

        if (ctx != null) {
     
       
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }
        switch (request.getCode()) {
     
       
            //向NameServer追加KV配置
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            //从NameServer获取KV配置
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            //从NameServer获取KV配置
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            //获取版本信息
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            //注册一个Broker,数据都是持久化的,如果存在则覆盖配置
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
     
       
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
     
       
                    return this.registerBroker(ctx, request);
                }
            //卸载一个Broker,数据都是持久化的
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            //根据Topic获取Broker Name、topic配置信息
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            //获取注册到Name Server的所有Broker集群信息
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            //去掉BrokerName的写权限
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            //从Name Server获取完整Topic列表
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            //从Namesrv删除Topic配置
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            //通过Namespace获取所有的KV List
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            //获取指定集群下的所有 topic
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            //获取所有系统内置 Topic 列表
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            //单元化相关 topic
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            //获取含有单元化订阅组的 Topic 列表
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            //获取含有单元化订阅组的非单元化
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            //更新Name Server配置
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

NamesrvStartup

1、 NamesrvStartupNameServer的启动入口,启动的核心是调用NamesrvControllerinitialize()方法;

     public boolean initialize() {
     
       
    
            //从文件中加载数据到内存中,默认从${user.home}/namesrv/kvConfig.json文件加载
            this.kvConfigManager.load();
            //创建服务Server,传入处理连接的ChannelEventListener
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    
            //默认任务处理器的线程池,每一个RequestCode可以单独设置一个线程池,如果不设置就使用默认的线程池
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    
            //注册默认处理器,根据requestCode执行相应的处理
            this.registerProcessor();
    
            //启动后延迟5秒开始执行,每隔10秒执行一次,对于两分钟没有活跃的broker,关闭连接
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     
       
    
                @Override
                public void run() {
     
       
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
    
            //启动后延迟1min,每隔10分钟执行打印configTable
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     
       
    
                @Override
                public void run() {
     
       
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
     
       
                // Register a listener to reload SslContext
                try {
     
       
                    fileWatchService = new FileWatchService(
                        new String[] {
     
       
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
     
       
                            boolean certChanged, keyChanged = false;
                            @Override
                            public void onChanged(String path) {
     
       
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
     
       
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
     
       
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
     
       
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
     
       
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }
                            private void reloadServerSslContext() {
     
       
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
     
       
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
    
            return true;
        }
    

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: