温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ源码中如何实现注册服务器

发布时间:2021-12-17 16:13:25 来源:亿速云 阅读:155 作者:小新 栏目:云计算

这篇文章给大家分享的是有关RocketMQ源码中如何实现注册服务器的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

NamesrvStartup

该类用于启动注册服务器。其main方法委托了main0方法,该方法的执行逻辑如下:

  1. 调用方法NamesrvStartup#createNamesrvController创建一个NamesrvController实例,声明为controller

  2. 调用方法NamesrvStartup#start将这个controller启动。

那么下面就分别来看下两个方法的具体内容。

createNamesrvController

这个方法最重要的就是用构造方法创建了NamesrvController对象。而在调用构造方法之前有较多的代码是用于解析命令行对象,以及可能的情况下读取文件中的配置信息、打印当前的整体配置信息。

这些额外配置不存在的时候,默认配置下,注册服务器是监听于9876端口。

start

该方法的作用是启动入参的NamesrvController实例。具体来说,流程如下:

  1. 执行方法NamesrvController#initialize进行初始化。

  2. 为运行时添加一个hook,在JVM关闭的时候,执行方法NamesrvController#shutdown对注册服务器执行优雅关闭。

  3. 执行方法NamesrvController#start启动注册服务器。

NamesrvController

这个类用于控制注册服务器。

构造方法

构造方法中主要是为了几个重要属性进行赋值操作。比如初始化kvConfigManagerrouteInfoManager这两个重要的属性。

initialize

该方法用于初始化注册服务器,执行逻辑如下:

  1. 执行方法kvconfig.KVConfigManager#load加载配置信息。默认情况下,加载 ${user.home}/namesrv/kvConfig.json 文件的内容到属性kvconfig.KVConfigManager#configTable中。

  2. 新建一个NettyRemotingServer对象,为属性NamesrvController#remotingServer赋值。这个新建的对象,使用了BrokerHousekeepingService作为入参。该BrokerHousekeepingService的作用就是在发生通道关闭、异常、空闲等情况时,将该通道从路由信息里删除。

  3. 创建一个线程池,赋值给属性NamesrvController#remotingExecutor,用于注册服务器在Netty中的业务执行。

  4. 调用方法NamesrvController#registerProcessor将业务处理器注册到RemotingServer中。使用的线程池就是步骤3创建的线程池。

  5. 创建一个间隔时间为10秒的周期性任务,任务内容是调用方法RouteInfoManager#scanNotActiveBroker扫描非激活模式的Broker

start

该方法没有更多内容,只是简单了启动了RemotingServer。在这个方法之后,就可以开始监听Broker上送的注册请求。

KVConfigManager

该类是注册服务器的配置存储类。会将配置信息存储在文件 ${user.home}/namesrv/kvConfig.json 。内部用来存储配置信息的是一个HashMap<String, HashMap<String, String>> 结构,也就是两级结构。

第一级是命名空间,第二集是KV对,都是字符串形式。

该类的load方法可以从文件中加载数据到内存里,persist方法可以将内存中的数据再写入到文件中。

DefaultRequestProcessor

这个类是 rocketmq-namesrv 这个包下面,代码量最多的类了。因为业务处理都实现在了这个类上面。

按照NettyRequestProcessor接口的实现套路,业务请求的分流都是在processRequest方法中,这里也是,接下来就一个个看这个类支持的命令。

PUT_KV_CONFIG

该命令没有请求体,请求头中有namespacekeyvalue字段,调用方法kvconfig.KVConfigManager#putKVConfig将配置项放入到配置管理器中即可。

GET_KV_CONFIG

该命令没有请求体,请求头中有namespacekey字段,调用方法kvconfig.KVConfigManager#getKVConfig获取对应配置项。

如果配置项存在,返回成功响应。如果配置信息不存在,返回失败响应,响应码为QUERY_NOT_FOUND

DELETE_KV_CONFIG

该命令没有请求体,请求头中有namespacekey字段,调用方法kvconfig.KVConfigManager#deleteKVConfig删除对应配置项。

QUERY_DATA_VERSION

该命令用于查询注册服务器上Broker的数据版本号。具体执行逻辑如下:

  1. 从命令的内容体解析出DataVersion对象,从请求头中解析出BrokerAddr数据。使用这两个作为入参,调用方法RouteInfoManager#isBrokerTopicConfigChanged判断与服务器上该BrokerAddr的版本号是否一致,将结果声明为changed

  2. 如果changedfalse,表明版本号没有变化,那么服务器上的数据在当前时间还是有效的,调用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp更新这个数据的有效时间。

  3. 调用方法RouteInfoManager#queryBrokerTopicConfig查询服务器上BrokerAddr对应的版本号,声明为nameSeverDataVersion

  4. 构建命令响应对象,如果nameSeverDataVersion不为null,则编码后设置到内容体。在响应头中设置changed属性,值为步骤1产生的声明对象。

REGISTER_BROKER

该命令用于Broker信息的注册。首先获取请求头中MQ的版本号,如果版本号大于等于3.0.11,则调用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer进行信息注册;否则调用方法processor.DefaultRequestProcessor#registerBroker进行信息注册。

registerBrokerWithFilterServer

方法的执行逻辑如下:

  1. 对请求命令进行解码工作,创建出RegisterBrokerRequestHeader对象。使用该对象对象和请求中的body字段执行crc校验,如果校验失败,返回系统错误响应。否则,继续后续流程。

  2. 如果命令请求对象中包含内容体,则解码出RegisterBrokerBody对象,声明为registerBrokerBody。如果命令请求对象不包含内容体,则手动创建RegisterBrokerBody对象,并且将其DataVersion的版本号设置为0,时间戳设置为0.

  3. 调用方法RouteInfoManager#registerBroker注册路由信息,将结果声明为result

  4. 创建类型为RegisterBrokerResponseHeader的响应头对象,声明为responseHeader。将resultmasterAddrHaServerAddr属性设置到响应头对象中。

  5. 从配置管理器中以ORDER_TOPIC_CONFIG作为命名空间,取出该命名空间下面的配置数据对象,编码后将二进制设置为响应的内容体。

  6. 返回响应对象。

registerBroker

registerBrokerWithFilterServer方法的流程基本一致,只不过在调用方法RouteInfoManager#registerBroker的时候,入参的filterServerList为null。

UNREGISTER_BROKER

该命令用于注销 Broker 的注册。调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker完成,而该方法内部则是委托给了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker

GET_ROUTEINFO_BY_TOPIC

该命名用于查询主题的路由信息,调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

该方法用于在路由管理器中根据主题名称获取全量的路由信息,具体流程如下:

  1. 使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData根据请求的主题名称得到类型为TopicRouteData的结果,声明为topicRouteData

  2. 如果topicRouteData不为null,则执行如下子流程。

    1. 如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable开启,则从命名空间ORDER_TOPIC_CONFIG下面,获取入参主题名称的配置信息,声明为orderTopicConf。将orderTopicConf设置到属性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf

    2. topicRouteData进行编码,设置为响应的内容体,返回响应对象。

  3. 如果topicRouteData为null,则返回TOPIC_NOT_EXIST响应。

GET_BROKER_CLUSTER_INFO

该命令调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo。该方法的逻辑就是调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo得到一个编码后的内容体,将这个内容体设置为响应的内容体,返回响应对象即可。

编码的内容体数据结构类是ClusterInfo,其属性如下

HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

WIPE_WRITE_PERM_OF_BROKER

该命令用于擦除Broker的写权限,也就说所有在该Broker上的主题都没有写入权限了。调用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker实现,该方法的逻辑如下:

  1. 调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock擦除入参Broker的写权限,方法的返回值为擦除的队列信息个数。将结果声明为wipeTopicCnt

  2. wipeTopicCnt设置到响应头的对应属性,返回响应。

GET_ALL_TOPIC_LIST_FROM_NAMESERVER

该命令用于获取注册服务器上全量的主题信息,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver实现。

该方法内部调用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList获取所有的主题名称形成的列表,并且编码为二进制数组,设置为响应的内容体,将响应返回。

DELETE_TOPIC_IN_NAMESRV

该命令用于删除服务器上的主题信息,通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv实现。方法实现也简单,直接从topicQueueTable中删除对应的主题名称即可。

GET_KVLIST_BY_NAMESPACE

该命令用于获取服务器上特定命名空间下的配置信息。通过方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace获取到对应的配置信息,并且编码为二进制数组。

如果数组存在,则设置到响应的内容体中,返回成功响应。

如果数组不存在,则返回QUERY_NOT_FOUND响应。

GET_TOPICS_BY_CLUSTER

该命令用于获取集群下所有的主题名称,调用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster完成。该方法内部调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster获取集群下的所有主题名称的编码结果,将编码结果的二进制数组设置到响应的内容体中,返回成功响应。

GET_SYSTEM_TOPIC_LIST_FROM_NS

这个命令有点奇怪,看命令名称是获取系统主题列表。但是从方法实现上,内部的内容整体是混乱的。这个命令暂且放下,等看到相关联的请求查询的时候在处理。

GET_UNIT_TOPIC_LIST

该命令用于获取集群下,有unit标识的主题名称集合。通过方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList实现,该方法内部调用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics来返回具备unit标识的主题名称集合的编码后二进制数组。将这个数组设置为响应的内容体,并且返回。

GET_HAS_UNIT_SUB_TOPIC_LIST

该命令用于获取集群下,有unit_sub标识的主题名称集合。做法上与GET_UNIT_TOPIC_LIST命令是相同的,只不过用的标识不同。

GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST

该命令用于获取集群下,同时有unitunit_sub标识的主题名称集合。做法上与上述的一致,只不过用的标识不同。

UPDATE_NAMESRV_CONFIG

这个命令是用于管理端直接发送配置的文本到注册服务,用于更新注册服务自身的配置,而后将配置信息持久化到磁盘文件。

GET_NAMESRV_CONFIG

这个命令用于获取注册服务的配置信息,将配置信息设置到响应的内容体中。

RouteInfoManager

该类是路由信息的管理器,其中使用了多个类来抽象各种路由信息。下面先看下这些定义类。

QueueData

该类保存了Broker中的队列信息。有如下属性:

  • brokerName,Broker的名称,默认情况下是Broker所在机器的域名,可以由配置定义。

  • readQueueNums,用于读取的队列数量。

  • writeQueueNums,用于写入的队列数量。

  • perm,该Broker的权限信息,权限指的是是否可读、是否可写。

  • topicSynFlag,主题同步标识。

BrokerData

该类保存了Broker集群的地址信息,有如下属性:

  • cluster,集群标识。

  • brokerName,Broker名称。

  • brokerAddrs,brokerId和BrokerAddr的映射表。该属性存储了同一个Broker名称下id和地址的映射关系。

BrokerLiveInfo

该类保存了具体某个Broker的存活信息,有如下属性

  • lastUpdateTimestamp,最近一次数据更新时间。

  • dataVersion,该Broker的主题配置信息的版本号。

  • channel,Netty的Channel对象,该对象即是Broker与服务器之间的链接对象。

  • haServerAddr,高可用主节点地址。格式为${ip}:${port} 。

存储属性

RouteInfoManager内部管理着5个Map结构,用于存储路由相关信息,这些信息用代码来看会更清晰一些,如下:

HashMap<String/* topic */, List<QueueData>> topicQueueTable;
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

registerBroker

该方法用于实现Broker信息注册到路由管理器上,具体方法流程如下:

  1. clusterAddrTable中以入参的clusterName获取集群下所有Broker的名称,声明为brokerNames

  2. 如果brokerNames为null,则为其赋值一个空的HashSet<String>。并且在clusterAddrTable放入这个clusterNamebrokerNames两个值。

  3. brokerNames中添加本次注册上来的Broker的名称。

  4. brokerAddrTablebrokerName获取BrokerData对象,如果不存在则新建一个并且放入到brokerAddrTable中。

  5. 取出步骤4中brokerData中的brokerAddrs映射,遍历其中的元素,如果值与入参的brokerAddr相等,键与入参的brokerId不等,则删除这个这一键值对。这种情况说明此时该IP对应的Broker信息已经发生了变化。

  6. 将入参的brokerIdbrokerAddr放入到brokerAddrs中。

  7. 如果brokerId为0也就是主节点,并且入参的topicConfigWrapper不为null,也就是说Broker发送的注册命令是包含了请求体,那么执行子流程。否则继续后续流程。

    1. brokerLiveTable查询该broker的版本号,与topicConfigWrapper的版本号对比,确认是否有变化。如果有变化,或者该Broker是新注册的(brokerName第一次注册或者brokerId第一次注册),那么就很有可能本次携带了新的主题配置信息。则需要更更新注册服务器上主题配置信息。也就执行后续流程。否则结束子流程,继续执行步骤8.

    2. 遍历属性TopicConfigSerializeWrapper#topicConfigTable,对集合中每一个元素调用方法RouteInfoManager#createAndUpdateQueueData,更新主题对应的队列信息。

  8. 构建BrokerLiveInfo对象,放入brokerLiveTable中。

  9. 如果入参的filterServerList不为null,则放入filterServerTable

  10. 如果brokerId不为0,也就是当前是从节点在注册自己,则从brokerAddrs获取主节点的地址。如果主节点地址存在,则进一步获取其 HaServer 地址。将这两个数据设置到返回的结果对象result中。

  11. 返回结果对象result。从代码可以看出,如果当前注册不是从节点,或者对应的主节点不存在,则result是一个空对象。

createAndUpdateQueueData

该方法是用于创建或更新 topicQueueTable 中的QueueData对象的。具体流程如下:

  1. 构建一个QueuData对象,里面的属性来自brokerName、topicConfig 对象。

  2. 从 topicQueueTable 中获取topicConfig 主题对应的 queueDataList 对象。

  3. 如果 queueDataList 不存在,意味着该主题是第一次出现在注册服务器中。构建一个新的linkedList对象,添加queueData对象到其中,并且将queueDataList放入到topicQueueTable中。流程结束。

  4. 如果queueDataList存在,则对其元素遍历,执行如下子操作。

    1. 元素的brokerName属性与入参的brokerName值相同,则继续执行后续流程,否则进入下一次循环迭代。

    2. 判断元素与步骤1构建的对象是否相同,如果相同,不做操作;如果不同,意味着数据有变化,将元素从集合中删除。

  5. 如果步骤4中有元素被删除,则将步骤1的对象,添加到queueDataList中。

unregisterBroker

该方法用于在删除路由管理器中某一个Broker的信息。具体流程如下:

  1. brokerLiveTable中删除该Broker信息。

  2. filterServerTable删除该Broker的信息。

  3. 声明一个局部变量removeBrokerName。从brokerAddrTable获取该BrokerName对应的brokerData。如果其不为空,则执行子流程。

    1. brokerDatabrokerAddrs删除该brokerId对应的映射。

    2. 如果brokerAddrs集合为空,则从brokerAddrTable删除该brokerName对应的映射。为removeBrokerName赋值true

  4. 如果removeBrokerName为真,则执行子流程,否则流程结束。

    1. clusterAddrTable获取该clusterName对应的brokerName的集合,声明为nameSet

    2. nameSet不为null的情况下,从nameSet删除本次的brokerName。如果删除后nameSet为空,则从clusterAddrTable删除该brokerName的映射。

    3. 调用方法removeTopicByBrokerName删除brokerName对应的主题信息。

removeTopicByBrokerName

该方法用于删除brokerName对应的主题配置信息,具体执行逻辑如下:

  1. 遍历topicQueueTable,为每一个元素执行后续逻辑。

  2. 针对每一个元素,取出其QueueData列表,遍历该对象。执行子流程。

    1. 遍历QueueData列表,如果元素QueueDatabrokerName与入参brokerName相同,则从列表中删除该元素。

    2. 遍历完毕后,如果列表为空,则从topicQueueTable中删除该映射。

pickupTopicRouteData

首先来看下数据结构对象TopicRouteData的定义,其属性如下

String orderTopicConf;
List<QueueData> queueDatas;
List<BrokerData> brokerDatas;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable

从数据结构对象也可以简单的推测出pickupTopicRouteData方法的实现逻辑。大致来说分为几个步骤:

  1. topicQueueTable按照主题名称查询queueDatas

  2. 根据queueDatas中每一个元素QueueDatabrokerName属性从brokerAddrTable取得brokerData对象,组成成一个List,也就是brokerDatas

  3. 根据步骤2的brokerDatasfilterServerTable查询到对应的filterServer列表,组装为映射。

  4. 将步骤1到3的值组装为TopicRouteData对象返回给调用者。

wipeWritePermOfBrokerByLock

该方法会以可中断的方式获取写锁,获取成功后调用方法wipeWritePermOfBroker。如果获取失败则返回0,获取成功则执行方法wipeWritePermOfBroker执行擦除工作。

wipeWritePermOfBroker方法的内容也很简单,遍历topicQueueTable,针对每一个元素,在遍历其QueueData,如果brokerName与入参的brokerName相同就意味着找到对应的QueueData。将这个里面的perm属性重新设置值,去掉代表写权限的标志位即可。

getUnitTopics

该方法用于获取具备unit标识的主题名称集合。具体流程如下:

  1. 以可中断的方式获取读锁。遍历topicQueueTable元素。

  2. 如果键值对中的QueueData列表的首个元素的topicSynFlag属性值包含了unit标识,将这个键值对的key,也即是主题名称加入到临时集合中。

  3. 遍历完后后,返回临时集合编码的二进制数组。

onChannelDestroy

当一个Broker的通道关闭的时候,会触发到这个方法。这个方法的代码虽然比较多,但是方法思路很简单,首先通过Channel在brokerLiveTable中找到对应的BrokerLiveInfo对象。并且依靠这个对象的信息,在路由管理器中删除所有相关的信息接口。

感谢各位的阅读!关于“RocketMQ源码中如何实现注册服务器”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI