这篇文章给大家分享的是有关RocketMQ源码中如何实现注册服务器的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
该类用于启动注册服务器。其main
方法委托了main0
方法,该方法的执行逻辑如下:
调用方法NamesrvStartup#createNamesrvController
创建一个NamesrvController
实例,声明为controller
。
调用方法NamesrvStartup#start
将这个controller启动。
那么下面就分别来看下两个方法的具体内容。
这个方法最重要的就是用构造方法创建了NamesrvController
对象。而在调用构造方法之前有较多的代码是用于解析命令行对象,以及可能的情况下读取文件中的配置信息、打印当前的整体配置信息。
这些额外配置不存在的时候,默认配置下,注册服务器是监听于9876端口。
该方法的作用是启动入参的NamesrvController
实例。具体来说,流程如下:
执行方法NamesrvController#initialize
进行初始化。
为运行时添加一个hook,在JVM关闭的时候,执行方法NamesrvController#shutdown
对注册服务器执行优雅关闭。
执行方法NamesrvController#start
启动注册服务器。
这个类用于控制注册服务器。
构造方法中主要是为了几个重要属性进行赋值操作。比如初始化kvConfigManager
和routeInfoManager
这两个重要的属性。
该方法用于初始化注册服务器,执行逻辑如下:
执行方法kvconfig.KVConfigManager#load
加载配置信息。默认情况下,加载 ${user.home}/namesrv/kvConfig.json 文件的内容到属性kvconfig.KVConfigManager#configTable
中。
新建一个NettyRemotingServer对象,为属性NamesrvController#remotingServer
赋值。这个新建的对象,使用了BrokerHousekeepingService
作为入参。该BrokerHousekeepingService
的作用就是在发生通道关闭、异常、空闲等情况时,将该通道从路由信息里删除。
创建一个线程池,赋值给属性NamesrvController#remotingExecutor
,用于注册服务器在Netty中的业务执行。
调用方法NamesrvController#registerProcessor
将业务处理器注册到RemotingServer
中。使用的线程池就是步骤3创建的线程池。
创建一个间隔时间为10秒的周期性任务,任务内容是调用方法RouteInfoManager#scanNotActiveBroker
扫描非激活模式的Broker
。
该方法没有更多内容,只是简单了启动了RemotingServer
。在这个方法之后,就可以开始监听Broker
上送的注册请求。
该类是注册服务器的配置存储类。会将配置信息存储在文件 ${user.home}/namesrv/kvConfig.json 。内部用来存储配置信息的是一个HashMap<String, HashMap<String, String>>
结构,也就是两级结构。
第一级是命名空间,第二集是KV对,都是字符串形式。
该类的load
方法可以从文件中加载数据到内存里,persist
方法可以将内存中的数据再写入到文件中。
这个类是 rocketmq-namesrv 这个包下面,代码量最多的类了。因为业务处理都实现在了这个类上面。
按照NettyRequestProcessor
接口的实现套路,业务请求的分流都是在processRequest
方法中,这里也是,接下来就一个个看这个类支持的命令。
该命令没有请求体,请求头中有namespace
、key
、value
字段,调用方法kvconfig.KVConfigManager#putKVConfig
将配置项放入到配置管理器中即可。
该命令没有请求体,请求头中有namespace
、key
字段,调用方法kvconfig.KVConfigManager#getKVConfig
获取对应配置项。
如果配置项存在,返回成功响应。如果配置信息不存在,返回失败响应,响应码为QUERY_NOT_FOUND。
该命令没有请求体,请求头中有namespace
、key
字段,调用方法kvconfig.KVConfigManager#deleteKVConfig
删除对应配置项。
该命令用于查询注册服务器上Broker
的数据版本号。具体执行逻辑如下:
从命令的内容体解析出DataVersion
对象,从请求头中解析出BrokerAddr
数据。使用这两个作为入参,调用方法RouteInfoManager#isBrokerTopicConfigChanged
判断与服务器上该BrokerAddr
的版本号是否一致,将结果声明为changed
。
如果changed
为false
,表明版本号没有变化,那么服务器上的数据在当前时间还是有效的,调用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp
更新这个数据的有效时间。
调用方法RouteInfoManager#queryBrokerTopicConfig
查询服务器上BrokerAddr
对应的版本号,声明为nameSeverDataVersion
。
构建命令响应对象,如果nameSeverDataVersion
不为null,则编码后设置到内容体。在响应头中设置changed
属性,值为步骤1产生的声明对象。
该命令用于Broker
信息的注册。首先获取请求头中MQ的版本号,如果版本号大于等于3.0.11,则调用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer
进行信息注册;否则调用方法processor.DefaultRequestProcessor#registerBroker
进行信息注册。
方法的执行逻辑如下:
对请求命令进行解码工作,创建出RegisterBrokerRequestHeader
对象。使用该对象对象和请求中的body字段执行crc校验,如果校验失败,返回系统错误响应。否则,继续后续流程。
如果命令请求对象中包含内容体,则解码出RegisterBrokerBody
对象,声明为registerBrokerBody
。如果命令请求对象不包含内容体,则手动创建RegisterBrokerBody
对象,并且将其DataVersion
的版本号设置为0,时间戳设置为0.
调用方法RouteInfoManager#registerBroker
注册路由信息,将结果声明为result
。
创建类型为RegisterBrokerResponseHeader
的响应头对象,声明为responseHeader
。将result
的masterAddr
和HaServerAddr
属性设置到响应头对象中。
从配置管理器中以ORDER_TOPIC_CONFIG作为命名空间,取出该命名空间下面的配置数据对象,编码后将二进制设置为响应的内容体。
返回响应对象。
与registerBrokerWithFilterServer
方法的流程基本一致,只不过在调用方法RouteInfoManager#registerBroker
的时候,入参的filterServerList
为null。
该命令用于注销 Broker 的注册。调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker
完成,而该方法内部则是委托给了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
。
该命名用于查询主题的路由信息,调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
。
该方法用于在路由管理器中根据主题名称获取全量的路由信息,具体流程如下:
使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
根据请求的主题名称得到类型为TopicRouteData
的结果,声明为topicRouteData
。
如果topicRouteData
不为null,则执行如下子流程。
如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable
开启,则从命名空间ORDER_TOPIC_CONFIG下面,获取入参主题名称的配置信息,声明为orderTopicConf
。将orderTopicConf
设置到属性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf
。
将topicRouteData
进行编码,设置为响应的内容体,返回响应对象。
如果topicRouteData
为null,则返回TOPIC_NOT_EXIST响应。
该命令调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo
。该方法的逻辑就是调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo
得到一个编码后的内容体,将这个内容体设置为响应的内容体,返回响应对象即可。
编码的内容体数据结构类是ClusterInfo
,其属性如下
HashMap<String/* brokerName */, BrokerData> brokerAddrTable; HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
该命令用于擦除Broker的写权限,也就说所有在该Broker
上的主题都没有写入权限了。调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker
实现,该方法的逻辑如下:
调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock
擦除入参Broker
的写权限,方法的返回值为擦除的队列信息个数。将结果声明为wipeTopicCnt
。
将wipeTopicCnt
设置到响应头的对应属性,返回响应。
该命令用于获取注册服务器上全量的主题信息,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver
实现。
该方法内部调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList
获取所有的主题名称形成的列表,并且编码为二进制数组,设置为响应的内容体,将响应返回。
该命令用于删除服务器上的主题信息,通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv
实现。方法实现也简单,直接从topicQueueTable
中删除对应的主题名称即可。
该命令用于获取服务器上特定命名空间下的配置信息。通过方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace
获取到对应的配置信息,并且编码为二进制数组。
如果数组存在,则设置到响应的内容体中,返回成功响应。
如果数组不存在,则返回QUERY_NOT_FOUND响应。
该命令用于获取集群下所有的主题名称,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster
完成。该方法内部调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster
获取集群下的所有主题名称的编码结果,将编码结果的二进制数组设置到响应的内容体中,返回成功响应。
这个命令有点奇怪,看命令名称是获取系统主题列表。但是从方法实现上,内部的内容整体是混乱的。这个命令暂且放下,等看到相关联的请求查询的时候在处理。
该命令用于获取集群下,有unit标识的主题名称集合。通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList
实现,该方法内部调用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics
来返回具备unit标识的主题名称集合的编码后二进制数组。将这个数组设置为响应的内容体,并且返回。
该命令用于获取集群下,有unit_sub标识的主题名称集合。做法上与GET_UNIT_TOPIC_LIST命令是相同的,只不过用的标识不同。
该命令用于获取集群下,同时有unit和unit_sub标识的主题名称集合。做法上与上述的一致,只不过用的标识不同。
这个命令是用于管理端直接发送配置的文本到注册服务,用于更新注册服务自身的配置,而后将配置信息持久化到磁盘文件。
这个命令用于获取注册服务的配置信息,将配置信息设置到响应的内容体中。
该类是路由信息的管理器,其中使用了多个类来抽象各种路由信息。下面先看下这些定义类。
QueueData
该类保存了Broker中的队列信息。有如下属性:
brokerName,Broker的名称,默认情况下是Broker所在机器的域名,可以由配置定义。
readQueueNums,用于读取的队列数量。
writeQueueNums,用于写入的队列数量。
perm,该Broker的权限信息,权限指的是是否可读、是否可写。
topicSynFlag,主题同步标识。
BrokerData
该类保存了Broker集群的地址信息,有如下属性:
cluster,集群标识。
brokerName,Broker名称。
brokerAddrs,brokerId和BrokerAddr的映射表。该属性存储了同一个Broker名称下id和地址的映射关系。
BrokerLiveInfo
该类保存了具体某个Broker的存活信息,有如下属性
lastUpdateTimestamp,最近一次数据更新时间。
dataVersion,该Broker的主题配置信息的版本号。
channel,Netty的Channel对象,该对象即是Broker与服务器之间的链接对象。
haServerAddr,高可用主节点地址。格式为${ip}:${port} 。
RouteInfoManager内部管理着5个Map结构,用于存储路由相关信息,这些信息用代码来看会更清晰一些,如下:
HashMap<String/* topic */, List<QueueData>> topicQueueTable; HashMap<String/* brokerName */, BrokerData> brokerAddrTable; HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
该方法用于实现Broker信息注册到路由管理器上,具体方法流程如下:
从clusterAddrTable
中以入参的clusterName
获取集群下所有Broker
的名称,声明为brokerNames
。
如果brokerNames
为null,则为其赋值一个空的HashSet<String>
。并且在clusterAddrTable
放入这个clusterName
和brokerNames
两个值。
在brokerNames
中添加本次注册上来的Broker的名称。
从brokerAddrTable
以brokerName
获取BrokerData
对象,如果不存在则新建一个并且放入到brokerAddrTable
中。
取出步骤4中brokerData
中的brokerAddrs
映射,遍历其中的元素,如果值与入参的brokerAddr
相等,键与入参的brokerId
不等,则删除这个这一键值对。这种情况说明此时该IP对应的Broker信息已经发生了变化。
将入参的brokerId
和brokerAddr
放入到brokerAddrs
中。
如果brokerId为0也就是主节点,并且入参的topicConfigWrapper
不为null,也就是说Broker发送的注册命令是包含了请求体,那么执行子流程。否则继续后续流程。
从brokerLiveTable
查询该broker
的版本号,与topicConfigWrapper
的版本号对比,确认是否有变化。如果有变化,或者该Broker
是新注册的(brokerName
第一次注册或者brokerId
第一次注册),那么就很有可能本次携带了新的主题配置信息。则需要更更新注册服务器上主题配置信息。也就执行后续流程。否则结束子流程,继续执行步骤8.
遍历属性TopicConfigSerializeWrapper#topicConfigTable
,对集合中每一个元素调用方法RouteInfoManager#createAndUpdateQueueData
,更新主题对应的队列信息。
构建BrokerLiveInfo
对象,放入brokerLiveTable
中。
如果入参的filterServerList
不为null,则放入filterServerTable
。
如果brokerId不为0,也就是当前是从节点在注册自己,则从brokerAddrs
获取主节点的地址。如果主节点地址存在,则进一步获取其 HaServer 地址。将这两个数据设置到返回的结果对象result中。
返回结果对象result。从代码可以看出,如果当前注册不是从节点,或者对应的主节点不存在,则result是一个空对象。
该方法是用于创建或更新 topicQueueTable 中的QueueData
对象的。具体流程如下:
构建一个QueuData对象,里面的属性来自brokerName、topicConfig 对象。
从 topicQueueTable 中获取topicConfig 主题对应的 queueDataList 对象。
如果 queueDataList
不存在,意味着该主题是第一次出现在注册服务器中。构建一个新的linkedList
对象,添加queueData
对象到其中,并且将queueDataList
放入到topicQueueTable
中。流程结束。
如果queueDataList
存在,则对其元素遍历,执行如下子操作。
元素的brokerName
属性与入参的brokerName
值相同,则继续执行后续流程,否则进入下一次循环迭代。
判断元素与步骤1构建的对象是否相同,如果相同,不做操作;如果不同,意味着数据有变化,将元素从集合中删除。
如果步骤4中有元素被删除,则将步骤1的对象,添加到queueDataList
中。
该方法用于在删除路由管理器中某一个Broker
的信息。具体流程如下:
在brokerLiveTable
中删除该Broker
信息。
在filterServerTable
删除该Broker
的信息。
声明一个局部变量removeBrokerName
。从brokerAddrTable
获取该BrokerName
对应的brokerData
。如果其不为空,则执行子流程。
从brokerData
的brokerAddrs
删除该brokerId
对应的映射。
如果brokerAddrs
集合为空,则从brokerAddrTable
删除该brokerName
对应的映射。为removeBrokerName
赋值true
。
如果removeBrokerName
为真,则执行子流程,否则流程结束。
从clusterAddrTable
获取该clusterName
对应的brokerName
的集合,声明为nameSet
。
nameSet
不为null的情况下,从nameSet
删除本次的brokerName
。如果删除后nameSet
为空,则从clusterAddrTable
删除该brokerName
的映射。
调用方法removeTopicByBrokerName
删除brokerName
对应的主题信息。
该方法用于删除brokerName
对应的主题配置信息,具体执行逻辑如下:
遍历topicQueueTable
,为每一个元素执行后续逻辑。
针对每一个元素,取出其QueueData
列表,遍历该对象。执行子流程。
遍历QueueData
列表,如果元素QueueData
的brokerName
与入参brokerName
相同,则从列表中删除该元素。
遍历完毕后,如果列表为空,则从topicQueueTable
中删除该映射。
首先来看下数据结构对象TopicRouteData
的定义,其属性如下
String orderTopicConf; List<QueueData> queueDatas; List<BrokerData> brokerDatas; HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
从数据结构对象也可以简单的推测出pickupTopicRouteData
方法的实现逻辑。大致来说分为几个步骤:
从topicQueueTable
按照主题名称查询queueDatas
。
根据queueDatas
中每一个元素QueueData
的brokerName
属性从brokerAddrTable
取得brokerData
对象,组成成一个List,也就是brokerDatas
。
根据步骤2的brokerDatas
从filterServerTable
查询到对应的filterServer
列表,组装为映射。
将步骤1到3的值组装为TopicRouteData
对象返回给调用者。
该方法会以可中断的方式获取写锁,获取成功后调用方法wipeWritePermOfBroker
。如果获取失败则返回0,获取成功则执行方法wipeWritePermOfBroker
执行擦除工作。
wipeWritePermOfBroker
方法的内容也很简单,遍历topicQueueTable
,针对每一个元素,在遍历其QueueData
,如果brokerName
与入参的brokerName
相同就意味着找到对应的QueueData
。将这个里面的perm
属性重新设置值,去掉代表写权限的标志位即可。
该方法用于获取具备unit标识的主题名称集合。具体流程如下:
以可中断的方式获取读锁。遍历topicQueueTable
元素。
如果键值对中的QueueData
列表的首个元素的topicSynFlag
属性值包含了unit标识,将这个键值对的key
,也即是主题名称加入到临时集合中。
遍历完后后,返回临时集合编码的二进制数组。
当一个Broker的通道关闭的时候,会触发到这个方法。这个方法的代码虽然比较多,但是方法思路很简单,首先通过Channel在brokerLiveTable
中找到对应的BrokerLiveInfo对象。并且依靠这个对象的信息,在路由管理器中删除所有相关的信息接口。
感谢各位的阅读!关于“RocketMQ源码中如何实现注册服务器”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。