RocketMQ源码阅读 NameServer
NameServer 角色
NameSever
在 RocketMQ
起到重要的角色,承担着路由管理、服务注册、服务发现等核心功能。
- 接收
Broker
的请求注册Broker
路由信息 - 接收
Client
请求根据某个topic
获取所有到broker
的路由信息
NameSrv 核心类
NamesrvStartup
public class NamesrvStartup {
//...
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
//启动 NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
//...
/**
* 启动 NamesrvController
* @param controller
* @return
* @throws Exception
*/
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// NamesrvController 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 启动后的钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
}
NamesrvController
public class NamesrvController {
public boolean initialize() {
//从配置文件以及一系列的配置中,捞取配置放入到 Config Manager
this.kvConfigManager.load();
//创建 Netty 通信 Server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//创建线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册事件处理器
this.registerProcessor();
//扫描broker & 移除不存活的broker 间隔 10s 执行
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//间隔 10 min 中, 打印配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//todo ....SSL 的代码,忽略
return true;
}
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
}
KvConfigManager
public class KVConfigManager {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();
//.....
public void load() {
String content = null;
try {
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
} catch (IOException e) {
log.warn("Load KV config table exception", e);
}
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
}
}
}
//..... 其他代码省略
// 周期打印配置的
public void printAllPeriodically() {
try {
this.lock.readLock().lockInterruptibly();
try {
log.info("--------------------------------------------------------");
{
log.info("configTable SIZE: {}", this.configTable.size());
Iterator<Entry<String, HashMap<String, String>>> it =
this.configTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<String, String>> next = it.next();
Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
while (itSub.hasNext()) {
Entry<String, String> nextSub = itSub.next();
log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
nextSub.getValue());
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("printAllPeriodically InterruptedException", e);
}
}
}
RouteInfoManager
public class RouteInfoManager {
//读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic 和 队列信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// BrokerName 和 以 BrokerName 为单位的 Broker集合
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群 以及 属于该集群的 brokerName 列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 存活的 broker 地址列表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker 对应的 filter server 列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//...... 全部是关于上面这些缓存数据的修改更新等等代码
//额外的扫描清理方法
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
}
DefaultRequestProcessor
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request); // KvConfigManager 存放配置并持久化
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request); // KvConfigManager 查询配置
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request); // KvConfigManager 删除配置
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request); // 查询数据的版本信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request); // 注册 broker
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request); // 解除注册 broker
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request); // 根据topic 获取 routeinfo
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request); //获取 broker 信息
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request); // 获取 offset 信息
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request); // 获取所有的 topic 列表
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request); // 删除
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request); // 获取 kvlist
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request); // 根据 cluster 获取 topic 列表
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request); // 更新 namesrv 配置
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request); // 获取 namesrv 配置
default:
break;
}
return null;
}
//...... 其他代码省略.....
}
NameSrv 启动流程
- 入口从
NamesrvStartup#main0
启动进入, 创建了一个 核心类NamesrvController
实例,NamesrvController#createNamesrvController
开启了配置参数、网络参数的配置 - 第二步,从
start
方法开启 调用NamesrvController#initialize
方法 KvConfigManager#load
从配置里面捞取配置合并配置到内存中
- 构建了
NettyRemotingServer
实例
- 构建了
- 第6步 注册指令处理器,用于从网络接收到的指令,异步在线程池中执行业务
- 第7、8步 定时器进行扫描检查和移除非活的有问题的 broker
- 第9、10步 定期打印配置
- 最后是
start
方法 - 开启了
netty romting server
开启netty
服务监听
- 开启了
- 开启
fileWatchService
监听
- 开启