03、RocketMQ源码分析:NamesrvStartup

目录

NamesrvStartup

NamesrvController

initialize

start

DefaultRequestProcessor

RequestCode.REGISTER_BROKER


RocketMQ分为NameServer和Broker,其中NameServer作为注册中心,来注册和存储Broker和Topic等的元数据信息,源码也并不复杂,简单记录自己看源码的过程

从启动NameServer的cmd文件中,可以看到启动的入口类为NamesrvStartup

*

源码结构

*

NamesrvStartup

NameServer的核心,就是一个NamesrvController,即broker的注册和查询等所有操作都由controller完成

*

start方法即调用controller的初始化和启动方法

*

NamesrvController

属性如下

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // nameServer自己的配置,ROCKETMQ_HOME、kvConfig配置文件路径等
    private final NamesrvConfig namesrvConfig;

    // netty的配置,执行handler任务的线程数量、子group的线程数量等
    private final NettyServerConfig nettyServerConfig;

    // 定时线程,定时清理无效的broker实例等
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

    // kv配置,namesrv的一些个性化配置,一些代码会根据自定义配置执行对应逻辑
    private final KVConfigManager kvConfigManager;

    // 注册表,topic的属性、broker的属性
    private final RouteInfoManager routeInfoManager;

    // netty调用封装,接收请求返回响应
    private RemotingServer remotingServer;

    // broker的监听器,监听上下线事件
    private BrokerHousekeepingService brokerHousekeepingService;

    // nameServer处理各种请求的处理线程
    private ExecutorService remotingExecutor;

    private Configuration configuration;// namesrvConfig和nettyServerConfig等配置的组装类
    private FileWatchService fileWatchService;// 如果使用TLS调用的话,这个对象用来监听sslContext文件,支持reload

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
}

initialize

加载配置、初始化处理线程池、启动定时清理下线broker的任务、初始化NettyRemotingServer

   public boolean initialize() {

        // 从文件中加载kv个性化配置
        this.kvConfigManager.load();

        // 初始化remotingServer,之前有分析过rocketMQ如何使用的netty进行通讯
        // 处理请求是使用的Pair<NettyRequestProcessor, ExecutorService> 来处理,key是处理逻辑,value是运行Processor的线程池
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // nameserver的处理Processor执行线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注册nameServer的逻辑处理器
        this.registerProcessor();

        // 定时清理无效的broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        // 定时打印kv配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // 这里如果使用TLS进行通信,监听证书路径,支持重新加载
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload n
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

start

即启动NettyRemotingServer,以及TLS文件监听线程

*

DefaultRequestProcessor

在NettyRemotingServer中接收到请求,由处理器链将请求解析成RemotingCommand后,交给最后的逻辑处理器来业务处理,在NameServer中,逻辑处理器就是NameSrvController在初始化方法中注册的处理器就是DefaultRequestProcessor

其中定义了很多请求类型,根据RemotingCommand的类型switch..case..来做不同操作,来看RequestCode.REGISTER_BROKER请求的处理

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    protected final NamesrvController namesrvController;

    public DefaultRequestProcessor(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
}

RequestCode.REGISTER_BROKER

broker启动向Namesrv注册的命令

  public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        // 实例化一个RegisterBroker请求的响应
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        // 获取响应头,后边根据注册结果设置值
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();

        // 根据RegisterBrokerRequestHeader定义的请求头,解析请求,获取brokerName、brokerAddr等信息
        final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        // 解析请求体,broker中注册的topic和filterServer信息
        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }

        // 向注册表注册该broker信息
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());

        // 返回响应
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());

        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

注册表RouteInfoManager属性如下,注册方法即把信息封装然后扔到map中

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        // Key是Topic的名称,value是这Topic存储的Broker和queue列表
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        // key是broker名,value是broker的地址
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        // key:集群名,value: broker名
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        // key是broker名,value是broker的实时状态,比如上回更新的时间戳,按照这个时间戳来判断是否超时需要清理
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        // Key是broker的地址,value是它关联的多个FilterServer的地址。Filter Server 是过滤服务器,是RocketMQ的一种服务端过滤方式
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }
}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: