07、RocketMQ源码分析:ConsumeQueue和IndexFile

目录

ConsumeQueue

IndexFile

putKey

selectPhyOffset


ConsumeQueue

生产者将消息发给broker,broker将所有消息全部写入commitLog一个文件中,那消费者如何根据topic获取消息,RocketMQ使用的是逻辑上的topic队列,即ConsumeQueue。

消息写入commitLog同时,将消息在commitLog的offset偏移量、消息大小、tags信息写入对应的topic队列(ConsumeQueue),消费者获取消息时先从ConsumeQueue获取消息的offset,在根据offset从commitLog中查询真正的消息数据。

引入别人一张图直观了解

*

文件的存储结构:

*

文件名是Topic名

*

一个Topic对应多个队列,名是queueId,每个文件夹都是一个ConsumeQueue

*

一个队列中每个文件大小默认是6000000字节

*

实例化代码:

根据topic文件路径和queueId创建一个MappedFileQueue

*

核心方法:putMessagePositionInfoWrapper

将请求的消息的offset,size,tags等写入MappedFileQueue

   public void putMessagePositionInfoWrapper(DispatchRequest request) {
        final int maxRetries = 30;
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();

            // 忽略部分无关代码

            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                    this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                }
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e);
                }
            }
        }

        // XXX: warn and notify me
        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }
  private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        // 一条消息在consumeQueue中的大小为定长20字节
        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);// 在commitLog中的偏移量  8字节
        this.byteBufferIndex.putInt(size);// 消息大小  4字节
        this.byteBufferIndex.putLong(tagsCode);// tags 标签的hashCode 8字节

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

            }
            this.maxPhysicOffset = offset + size;
            // 消息写入
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

IndexFile

RocketMQ支持根据消息的key进行消息查询,和ConsumeQueue类似,如果消息中有key,就为这条消息建立Hash索引,即将消息在CommitLog中的偏移量写入索引文件,

每个broker有一组indexFile,文件名即当时创建时的时间戳*

*

每个indexFile由三部分组成

1、 indexHeader,长度为40字节,包含六个内容:biginTimestamp(第一条消息存储时间戳),endTimestamp(最后一条消息存储时间戳),biginPhyoffset(第一条消息在commitlog中的偏移量,即commitlogoffset),endPhyoffset(最后一条消息在commitlog中的偏移量),hashSlotCount(含有index的slot数量),indexCount(包含的索引单元的个数);

2、 slots,参数控制,默认是500万,每个slot大小为4字节,槽中的4个字节存放的是索引块的位置;

3、 index,参数控制,默认是2000万,每个索引块大小为20字节,4字节的hashKey,8字节的commitLog偏移量,4字节的当前key对应消息的存储时间与indexFile的时间差,4字节,当前索引块的前一个索引块位置;

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static int hashSlotSize = 4;// 每个槽大小  4字节
    private static int indexSize = 20;// 每个索引块大小, 20字节
    private static int invalidIndex = 0;
    private final int hashSlotNum; // 槽数量,默认500万
    private final int indexNum;// 索引块数量,默认2000万
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;

    public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
        final long endPhyOffset, final long endTimestamp) throws IOException {
        // 文件总大小:header+500万的槽+2000万的索引块
        int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        this.fileChannel = this.mappedFile.getFileChannel();
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        this.hashSlotNum = hashSlotNum;
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

        // 取前40个字节封装为IndexHeader
        this.indexHeader = new IndexHeader(byteBuffer);

        if (endPhyOffset > 0) {
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }

        if (endTimestamp > 0) {
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }
}

putKey

将某个消息key和对应commitLog的offset写入indexFile

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 头中记录的索引块数量 小于 文件最大值
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            // 对key的hashCode取模,算出slot的位置
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 取对应位置槽的值,也就是槽对应的索引块位置
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                // 当前索引块末尾空闲的位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                // 写入索引块的数据
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);// 4字节的hashKey
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);// 8字节的commitLog偏移量
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);// 4字节的 当前key对应消息的存储时间与indexFile的时间差
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);// 4字节,当前索引块的前一个索引块位置

                // 更新槽上的索引块位置
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                // 更新indexHeader中的各种标志位
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

selectPhyOffset

从indexFile根据某个key查询符合条件的所有消息,返回List ,列表中的值是消息在commitLog中的偏移量

   public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            // 获取hashKey所对应的槽位置
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }

                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }

                        // 获取槽对应的index块位置,索引块是根据后4字节被串成了一个链表,需要下边循环查询
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);// 前一个索引块

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        // commitLog中消息的存储时间需要在参数begin和end之间
                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        // 如果keyHash相同,即选中加入list
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        // 在循环前一个索引块
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: