RocketMQ源码阅读 NameServer

NameServer 角色

NameSeverRocketMQ 起到重要的角色,承担着路由管理、服务注册、服务发现等核心功能。

  • 接收 Broker 的请求注册 Broker 路由信息
  • 接收 Client 请求根据某个 topic 获取所有到 broker 的路由信息

NameSrv 核心类

  • NamesrvStartup
public class NamesrvStartup {
    //...
    public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            //启动 NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
    //...
    /**
     * 启动 NamesrvController
     * @param controller
     * @return
     * @throws Exception
     */
    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        // NamesrvController 初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // 启动后的钩子
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }
}
  • NamesrvController
public class NamesrvController {
    public boolean initialize() {

        //从配置文件以及一系列的配置中,捞取配置放入到 Config Manager
        this.kvConfigManager.load();

        //创建 Netty 通信 Server
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        //创建线程池
        this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        // 注册事件处理器
        this.registerProcessor();

        //扫描broker & 移除不存活的broker   间隔 10s 执行
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //间隔 10 min 中, 打印配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        //todo ....SSL 的代码,忽略
        return true;
    }

    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                    this.remotingExecutor);
        } else {

            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }
}
  • KvConfigManager
public class KVConfigManager {
    
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
   
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
            new HashMap<String, HashMap<String, String>>();
    //.....
    public void load() {
        String content = null;
        try {
            content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
        } catch (IOException e) {
            log.warn("Load KV config table exception", e);
        }
        if (content != null) {
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                    KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
            if (null != kvConfigSerializeWrapper) {
                this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                log.info("load KV config table OK");
            }
        }
    }
    //..... 其他代码省略
    // 周期打印配置的
    public void printAllPeriodically() {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                log.info("--------------------------------------------------------");

                {
                    log.info("configTable SIZE: {}", this.configTable.size());
                    Iterator<Entry<String, HashMap<String, String>>> it =
                            this.configTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, HashMap<String, String>> next = it.next();
                        Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
                        while (itSub.hasNext()) {
                            Entry<String, String> nextSub = itSub.next();
                            log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
                                    nextSub.getValue());
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("printAllPeriodically InterruptedException", e);
        }
    }
}
  • RouteInfoManager
public class RouteInfoManager {
    //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // Topic 和 队列信息
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // BrokerName 和 以 BrokerName 为单位的 Broker集合
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // 集群 以及 属于该集群的 brokerName 列表
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // 存活的 broker 地址列表
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // broker 对应的 filter server 列表
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    //...... 全部是关于上面这些缓存数据的修改更新等等代码
    
    //额外的扫描清理方法
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
}
  • DefaultRequestProcessor
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {

        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
        }

        switch (request.getCode()) {

            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);  // KvConfigManager 存放配置并持久化
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);  // KvConfigManager 查询配置
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request); // KvConfigManager 删除配置
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request); // 查询数据的版本信息
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request); // 注册 broker
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request); // 解除注册 broker
            case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request); // 根据topic 获取 routeinfo
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request); //获取 broker 信息
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);    // 获取 offset 信息
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request); // 获取所有的 topic 列表
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);      // 删除
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request); // 获取 kvlist
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request); // 根据 cluster 获取 topic 列表
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request); // 更新 namesrv 配置
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request); // 获取 namesrv 配置
            default:
                break;
        }
        return null;
    }
    //...... 其他代码省略.....
}

NameSrv 启动流程

rocketmq_namesrv

  • 入口从 NamesrvStartup#main0 启动进入, 创建了一个 核心类 NamesrvController 实例, NamesrvController#createNamesrvController 开启了配置参数、网络参数的配置
  • 第二步,从 start 方法开启 调用 NamesrvController#initialize 方法
    • KvConfigManager#load 从配置里面捞取配置合并配置到内存中
    • 构建了 NettyRemotingServer 实例
  • 第6步 注册指令处理器,用于从网络接收到的指令,异步在线程池中执行业务
  • 第7、8步 定时器进行扫描检查和移除非活的有问题的 broker
  • 第9、10步 定期打印配置
  • 最后是 start 方法
    • 开启了 netty romting server 开启 netty 服务监听
    • 开启 fileWatchService 监听
comments powered by Disqus