05、RocketMQ源码分析:MappedFileQueue

目录

load

truncateDirtyFiles

getMinOffset

getLastMappedFile

flush/commit


MappedFile是一个文件的映射类,MappedFileQueue则是MappedFile的列表,例如RocketMQ中一个CommitLog文件大小默认是1G,文件名是起始偏移量,首个文件如下

*

文件写满后,会自动生成下一个文件,MappedFileQueue就是代表这一种文件的列表

*

public class MappedFileQueue {

    private final String storePath;//文件路径

    private final int mappedFileSize;// 每个文件的大小

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 多个文件的列表

    private final AllocateMappedFileService allocateMappedFileService;// 创建新文件的异步线程

    private long flushedWhere = 0;
    private long committedWhere = 0;

    private volatile long storeTimestamp = 0;

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }
}

load

映射路径下的文件列表成List

   public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    // 映射文件到MappedFile,设置初始的position
                    // 如果文件还没写满,在truncateDirtyFiles()方法中会将position重置,从哪里调用的在truncateDirtyFiles()暂不关注
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

truncateDirtyFiles

清除offset之后的脏数据
1、 重置offset所在的MappedFile的position;
2、 清理offset所在的MappedFile之后的文件;

    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

getMinOffset

获取最小的offset,也就是这一系列文件的最小起始位置,第一个文件的名

  public long getMinOffset() {

        if (!this.mappedFiles.isEmpty()) {
            try {
                return this.mappedFiles.get(0).getFileFromOffset();
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getMinOffset has exception.", e);
            }
        }
        return -1;
    }

getLastMappedFile

获取最后一个文件的映射对象(MappedFile), 如果最后一个文件已经写满了,重新创建一个新文件

    // 根据给出的偏移量,获取所在的MappedFile
    // 如果文件写满了,创建新的
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 判断是否需要新创建文件,如果不等于-1,表示需要创建一个文件名是createOffset的新文件
        long createOffset = -1;
        // 列表最后一个MappedFile
        MappedFile mappedFileLast = getLastMappedFile();

        // 如果没有文件,需要新键,文件名即偏移量
        // 文件都必须是相同大小(mappedFileSize)
        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        // 如果已经写满(文件大小==已写入的大小)
        // 当前最后的文件大小+mappedFileSize就是下个文件的名
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }

        // 需要创建新文件
        if (createOffset != -1 && needCreate) {
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            // allocateMappedFileService不为null,是异步创建文件
            // putRequestAndReturnMappedFile把创建新文件请求写入一个请求队列,然后countDownLatch.await等待创建完成
            // allocateMappedFileService.run中从队列取请求创建文件,创建完成countDownLatch.countdown
            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

flush/commit

根据当前记录的刷新和提交标志位获取对应的MappedFile,调用MappedFile.flush/commit

    private long flushedWhere = 0;
    private long committedWhere = 0;

  public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: