目录
ConsumeQueue
IndexFile
putKey
selectPhyOffset
ConsumeQueue
生产者将消息发给broker,broker将所有消息全部写入commitLog一个文件中,那消费者如何根据topic获取消息,RocketMQ使用的是逻辑上的topic队列,即ConsumeQueue。
消息写入commitLog同时,将消息在commitLog的offset偏移量、消息大小、tags信息写入对应的topic队列(ConsumeQueue),消费者获取消息时先从ConsumeQueue获取消息的offset,在根据offset从commitLog中查询真正的消息数据。
文件的存储结构:
文件名是Topic名
一个Topic对应多个队列,名是queueId,每个文件夹都是一个ConsumeQueue
一个队列中每个文件大小默认是6000000字节
实例化代码:
根据topic文件路径和queueId创建一个MappedFileQueue
核心方法:putMessagePositionInfoWrapper
将请求的消息的offset,size,tags等写入MappedFileQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
// 忽略部分无关代码
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
// 一条消息在consumeQueue中的大小为定长20字节
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);// 在commitLog中的偏移量 8字节
this.byteBufferIndex.putInt(size);// 消息大小 4字节
this.byteBufferIndex.putLong(tagsCode);// tags 标签的hashCode 8字节
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
}
this.maxPhysicOffset = offset + size;
// 消息写入
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
IndexFile
RocketMQ支持根据消息的key进行消息查询,和ConsumeQueue类似,如果消息中有key,就为这条消息建立Hash索引,即将消息在CommitLog中的偏移量写入索引文件,
每个broker有一组indexFile,文件名即当时创建时的时间戳
每个indexFile由三部分组成
1、 indexHeader,长度为40字节,包含六个内容:biginTimestamp(第一条消息存储时间戳),endTimestamp(最后一条消息存储时间戳),biginPhyoffset(第一条消息在commitlog中的偏移量,即commitlogoffset),endPhyoffset(最后一条消息在commitlog中的偏移量),hashSlotCount(含有index的slot数量),indexCount(包含的索引单元的个数);
2、 slots,参数控制,默认是500万,每个slot大小为4字节,槽中的4个字节存放的是索引块的位置;
3、 index,参数控制,默认是2000万,每个索引块大小为20字节,4字节的hashKey,8字节的commitLog偏移量,4字节的当前key对应消息的存储时间与indexFile的时间差,4字节,当前索引块的前一个索引块位置;
public class IndexFile {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int hashSlotSize = 4;// 每个槽大小 4字节
private static int indexSize = 20;// 每个索引块大小, 20字节
private static int invalidIndex = 0;
private final int hashSlotNum; // 槽数量,默认500万
private final int indexNum;// 索引块数量,默认2000万
private final MappedFile mappedFile;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
private final IndexHeader indexHeader;
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
// 文件总大小:header+500万的槽+2000万的索引块
int fileTotalSize =
IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
// 取前40个字节封装为IndexHeader
this.indexHeader = new IndexHeader(byteBuffer);
if (endPhyOffset > 0) {
this.indexHeader.setBeginPhyOffset(endPhyOffset);
this.indexHeader.setEndPhyOffset(endPhyOffset);
}
if (endTimestamp > 0) {
this.indexHeader.setBeginTimestamp(endTimestamp);
this.indexHeader.setEndTimestamp(endTimestamp);
}
}
}
putKey
将某个消息key和对应commitLog的offset写入indexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 头中记录的索引块数量 小于 文件最大值
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 对key的hashCode取模,算出slot的位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// 取对应位置槽的值,也就是槽对应的索引块位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 当前索引块末尾空闲的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 写入索引块的数据
this.mappedByteBuffer.putInt(absIndexPos, keyHash);// 4字节的hashKey
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);// 8字节的commitLog偏移量
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);// 4字节的 当前key对应消息的存储时间与indexFile的时间差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);// 4字节,当前索引块的前一个索引块位置
// 更新槽上的索引块位置
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 更新indexHeader中的各种标志位
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
selectPhyOffset
从indexFile根据某个key查询符合条件的所有消息,返回List
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// 获取hashKey所对应的槽位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
// 获取槽对应的index块位置,索引块是根据后4字节被串成了一个链表,需要下边循环查询
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);// 前一个索引块
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
// commitLog中消息的存储时间需要在参数begin和end之间
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// 如果keyHash相同,即选中加入list
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
// 在循环前一个索引块
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: