温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RocketMQ存储中如何实现日志文件创建与映射

发布时间:2021-11-18 09:38:37 来源:亿速云 阅读:139 作者:小新 栏目:大数据

这篇文章将为大家详细讲解有关RocketMQ存储中如何实现日志文件创建与映射,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

1.问题描述

日志目录(可配置)/data/rocketmq/store/commitlog会有20位长度的日志文件。
1.日志文件什么时候创建的?
2.日志文件创建流程是什么?
3.日志文件和内存映射是怎么样的?

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:50 00000117290188144640
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:52 00000117291261886464
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:54 00000117292335628288
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117293409370112
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:57 00000117294483111936
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117295556853760
2.日志映射相关类初始化

在Broker启动时实例化了两个类DefaultMessageStore和AllocateMappedFileService。
AllocateMappedFileService是线程类继承了Runnable接口,该线程类持有DefaultMessageStore的引用(即:可操作管理DefaultMessageStore),并启动该线程类。
调用链

//Broker启动时调用
@1 BrokerStartup#main#createBrokerController()
boolean initResult = controller.initialize();
@2 Controller#initialize()
this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
@3 DefaultMessageStore#DefaultMessageStore()
//将DefaultMessageStore自身引用传给AllocateMappedFileService
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.allocateMappedFileService.start();//启动该线程类
@4 class AllocateMappedFileService extends ServiceThread
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
3.线程类一直运行在干啥

既然在Broker启动时该线程类AllocateMappedFileService就启动了,那么在做什么呢?run方法为while循环,即:只要服务不停止并且mmapOperation()返回true则一直运行

//异步处理,调用mmapOperation完成请求的处理
public void run() {
log.info(this.getServiceName() + " service started");
//while循环,只要服务部停止即调用 mmapOperation方法
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}

mmapOperation方法流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:mmapOperation方法主要做了两件事:初始化MappedFile和预热MappedFile。只要服务不停止和线程不被中断,这个过程一直重复运行。

4.提交映射文件请求(AllocateRequest)

既然AllocateMappedFileService一直从容器(优先级队列和ConcurrentHashMap)中获取AllocateRequest。AllocateRequest是什么时候产生并放到容器中的呢?
RocketMQ消息存储概览【源码笔记】中写入commitLog流程,获取最新的日志文件。
调用链

@1 CommitLog#putMessage
//文件已满或者没有映射文件重新创建一个文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
// Mark: NewFile may be cause noise
}
@2 MappedFileQueue#getLastMappedFile

流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:MappedFileQueue#getLastMappedFile会向线程类AllocateMappedFileServic提交两个映射文件创建请求:分别为nextFilePath和nextNextFilePath;如果线程类AllocateMappedFileServic为null,则直接new一个MappedFile,此时只会创建一个文件。

下面为提交两个映射文件请求流程,即:

AllocateMappedFileServic#putRequestAndReturnMappedFile


调用链

@1 MappedFileQueue#getLastMappedFile
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
}
@2 AllocateMappedFileService#putRequestAndReturnMappedFile

流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:处理提交的映射文件请求指的是:
实例化两个AllocateRequest并把他们提交到requestTable(ConcurrentHashMap)
和requestQueue(PriorityBlockingQueue)中,等待5秒,此段时间线程会从这两个容器中获取请求并创建MappedFile,并将结果返回。

5.MappedFile初始化

本段梳理下上文中mmapOperation方法流程图第5步初始化MappedFile的流程
调用链

@1 AllocateMappedFileService#mmapOperation
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
@2 MappedFile#init(fileName, fileSize);

流程图

RocketMQ存储中如何实现日志文件创建与映射

MappedFile主要干了两件事:1.创建日志文件。2.并将文件映射到内存中。

关于“RocketMQ存储中如何实现日志文件创建与映射”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI