目录
load
truncateDirtyFiles
getMinOffset
getLastMappedFile
flush/commit
MappedFile是一个文件的映射类,MappedFileQueue则是MappedFile的列表,例如RocketMQ中一个CommitLog文件大小默认是1G,文件名是起始偏移量,首个文件如下
文件写满后,会自动生成下一个文件,MappedFileQueue就是代表这一种文件的列表
public class MappedFileQueue {
private final String storePath;//文件路径
private final int mappedFileSize;// 每个文件的大小
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 多个文件的列表
private final AllocateMappedFileService allocateMappedFileService;// 创建新文件的异步线程
private long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
}
load
映射路径下的文件列表成List
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
// 映射文件到MappedFile,设置初始的position
// 如果文件还没写满,在truncateDirtyFiles()方法中会将position重置,从哪里调用的在truncateDirtyFiles()暂不关注
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
truncateDirtyFiles
清除offset之后的脏数据
1、 重置offset所在的MappedFile的position;
2、 清理offset所在的MappedFile之后的文件;
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
getMinOffset
获取最小的offset,也就是这一系列文件的最小起始位置,第一个文件的名
public long getMinOffset() {
if (!this.mappedFiles.isEmpty()) {
try {
return this.mappedFiles.get(0).getFileFromOffset();
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getMinOffset has exception.", e);
}
}
return -1;
}
getLastMappedFile
获取最后一个文件的映射对象(MappedFile), 如果最后一个文件已经写满了,重新创建一个新文件
// 根据给出的偏移量,获取所在的MappedFile
// 如果文件写满了,创建新的
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 判断是否需要新创建文件,如果不等于-1,表示需要创建一个文件名是createOffset的新文件
long createOffset = -1;
// 列表最后一个MappedFile
MappedFile mappedFileLast = getLastMappedFile();
// 如果没有文件,需要新键,文件名即偏移量
// 文件都必须是相同大小(mappedFileSize)
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 如果已经写满(文件大小==已写入的大小)
// 当前最后的文件大小+mappedFileSize就是下个文件的名
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 需要创建新文件
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// allocateMappedFileService不为null,是异步创建文件
// putRequestAndReturnMappedFile把创建新文件请求写入一个请求队列,然后countDownLatch.await等待创建完成
// allocateMappedFileService.run中从队列取请求创建文件,创建完成countDownLatch.countdown
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
flush/commit
根据当前记录的刷新和提交标志位获取对应的MappedFile,调用MappedFile.flush/commit
private long flushedWhere = 0;
private long committedWhere = 0;
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
this.committedWhere = where;
}
return result;
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: