温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何理解RocketMQ存储中的主从同步

发布时间:2021-11-17 17:15:56 阅读:159 作者:柒染 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

本篇文章给大家分享的是有关如何理解RocketMQ存储中的主从同步,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

一、问题思考

1.消息存储在Master上了,如何同步到Slave上了呢?
2.同步复制和异步复制流程是怎么样的?

二、Broker启动HA调用链

1.HA初始化调用链

@1 BrokerStartup#mainstart(createBrokerController(args));@2 BrokerStartup#createBrokerControllerboolean initResult = controller.initialize();@3 BrokerController#initializethis.messageStore = new DefaultMessageStore@4 DefaultMessageStore#DefaultMessageStore()this.haService = new HAService(this);this.defaultMessageStore = defaultMessageStore;this.acceptSocketService =new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());this.groupTransferService = new GroupTransferService();this.haClient = new HAClient();

2.启动调用链

@1 BrokerStartup#startcontroller.start();@2 BrokerController#startthis.messageStore.start();@3 DefaultMessageStore#start@4 this.haService.start();this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();

小结:从初始化和启动调用链中可以看到,在Broker启动时,初始化并启动了三个线程类,分别为AcceptSocketService, GroupTransferService, HAClient。

问题:这三个线程类在干啥?


三、线程类职责
1.AcceptSocketService职责

如何理解RocketMQ存储中的主从同步

小结:AcceptSocketService职责初始化TCP通道,监听新的连接并创建HAConnection。

问题:HAConnection在做什么?


2.HAConnection职责

//构造方法public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {this.haService = haService;this.socketChannel = socketChannel;//获取客户端请求地址this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();//将通道调整为非阻塞this.socketChannel.configureBlocking(false);//关闭连接前将数据发送完毕this.socketChannel.socket().setSoLinger(false, -1);//将Nagle算法关闭,客户端每发送一次数据无论大小,都会将其发送出去this.socketChannel.socket().setTcpNoDelay(true);//设置接受缓存区为64Kthis.socketChannel.socket().setReceiveBufferSize(1024 * 64);//设置发包缓存区为64Kthis.socketChannel.socket().setSendBufferSize(1024 * 64);//写数据线程类this.writeSocketService = new WriteSocketService(this.socketChannel);//读数据线程类this.readSocketService = new ReadSocketService(this.socketChannel);this.haService.getConnectionCount().incrementAndGet();}//启动public void start() {//启动读数据线程this.readSocketService.start();//启动写数据线程this.writeSocketService.start();}

疑问:HAConnection除了对通道做了一些设置外,启动了两个线程服务类,分别为readSocketService和writeSocketService,他们职责是什么呢?

2.1 writeSocketService职责
流程图

如何理解RocketMQ存储中的主从同步

小结:writeSocketService主要职责,将数据不断写入socketChannel通道;写入数据的大小为nextTransferFromWhere与最大可读位置getReadPosition之间数据;每次写完传输指针自增this.nextTransferFromWhere += size;每隔5秒发送心跳包到socketChannel通道。

2.2 readSocketService职责

流程图

如何理解RocketMQ存储中的主从同步

小结:readSocketService主要职责解析slave发来的请求位点,并更新push3SlaveMaxOffset为该请求位点;唤醒groupTransferService线程。

3.GroupTransferService职责

如何理解RocketMQ存储中的主从同步

小结:GroupTransferService职责判断主从同步是否完成,完成后唤醒消息发送线程。

4.HAClient职责

如何理解RocketMQ存储中的主从同步

小结:HAClient职责Slave封装实现类,负责与Master建立连接通道,并从通道中获取数据存储;并向Master上报Slave存储的最大物理偏移量。

五、主从同步示意图

1.主从同步交互消息格式
1.1 Slave上报物理偏移量reportOffset量格式

00000018516677754880|长度为8位的20位数字

1.2 Master写入Slave的信息由Header与Body构成

00000018516677754880+size|Header部分由8位物理偏移量+消息体大

消息Body具体内容|Slave请求的位点与Master可读位置之间的数据

2.主从同步示意图

如何理解RocketMQ存储中的主从同步

以上就是如何理解RocketMQ存储中的主从同步,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/u/4226611/blog/4353210

AI

开发者交流群×