目录
init
appendMessage
commit
flush
顾名思义,MappedFile是对文件的mmap映射类,其中封装了对文件的fileChannel和MappedByteBuffer,主要操作文件的写入、刷盘
写入分同步写和异步写,复制网上的一个图:
将commitLog文件mmap映射到一个MappedByteBuffer,同步写即接收到生产者消息直接将数据写入MappedByteBuffer,异步写即先将数据写入一个直接内存暂存区,由定时线程将暂存区数据写入fileChannel中
public class MappedFile extends ReferenceResource {
// 操作系统页大小 4k
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 当前应用映射的总内存大小,虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前应用映射了多少文件
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 已经写入的字节数量
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 已经提交到fileChannel的字节数量
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 已经刷盘的字节数
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 异步刷盘时使用的直接内存区和内存池
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;// 同步刷盘使用的mmap
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
public MappedFile() {
}
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
public MappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
}
init
根据文件路径初始化fileChannel和mappedByteBuffer
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
// 根据文件获取fileChannel和mmap(mappedByteBuffer)
init(fileName, fileSize);
// 如果是异步刷盘,向内存池申请一块直接内存,来暂存消息
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
appendMessage
写入mappedByteBuffer或者暂存区writeBuffer
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
return appendMessagesInner(messageExtBatch, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 当前文件已经写入了多少字节
int currentPos = this.wrotePosition.get();
// 文件未写满
if (currentPos < this.fileSize) {
// 如果异步刷盘writeBuffer不为null,同步刷盘走mappedByteBuffer
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 定位到已经写到的位置,准备往后续
byteBuffer.position(currentPos);
// 组装messageExt消息的字节,写入byteBuffer.put
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 标记位增加
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
commit
将暂存区的数据写入到fileChannel
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
// 判断是否满足commit的条件,一次提交最少commitLeastPages页
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// 所有数据均已提交,暂存用的直接内存就没用了,归还内存池
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
// 有需要提交的数据,写入fileChannel
if (writePos - lastCommittedPosition > commitLeastPages) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
// 重置标志位
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
flush
数据刷到磁盘
public int flush(final int flushLeastPages) {
// 是否满足最小刷盘的页数
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//异步刷盘走fileChannel
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
// 同步刷盘走mappedByteBuffer
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: