美文网首页
store模块阅读8:MappedFile

store模块阅读8:MappedFile

作者: 赤子心_d709 | 来源:发表于2017-10-17 14:52 被阅读436次

说明

对于 commitlog、 consumequeue、 index 三类大文件进行磁盘读写操作,均是通过 MappedFile 类来完成。

MappedFile与MappedFileQueue关系如下


mappedFile与mappedFileQueue关系

主要的方法,作用如下

init:初始化文件名,大小,以及writeBuffer
cleanup:实现父类函数,用于处理directByteBuffer相关的资源回收
destroy:关闭FileChannel,删除文件
appendMessagesInner:追加消息,调用AppendMessageCallback#doAppend完成
flush,isAbleToFlush: flush日志
commit,commit0,isAbleToCommit: commit日志
selectMappedBuffer:完成ByteBuffer随机读取

下面对源码进行分析

属性

    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);//占用内存大小

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//文件数

    protected final AtomicInteger wrotePosition = new AtomicInteger(0);//写到的位置,当值==fileSize代表文件写满了

    //ADD BY ChenYang
    /**
     * 当writeBuffer为空时,就不会有真正的commit行为
     * wrotePosition意义上就代表了committedPosition,真正的committedPosition值只会为初始值0,参考commit函数
     */
    protected final AtomicInteger committedPosition = new AtomicInteger(0);

    //刷盘刷到的位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);

    protected int fileSize;//mappedFile文件大小,参照MessageStoreConfig.mapedFileSizeCommitLog,默认1G
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;//从transientStorePool的池中拿到ByteBuffer,可能为null
    protected TransientStorePool transientStorePool = null;//池对象,init函数中赋值
    private String fileName;
    private long fileFromOffset;//fileFromOffset就是文件名称,一般为20位数字,代表这个文件开始时的offset
    private File file;
    private MappedByteBuffer mappedByteBuffer;//内存映射文件
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;

函数

构造,初始化函数

    public MappedFile() {
    }

    public MappedFile(final String fileName, final int fileSize) throws IOException {
        init(fileName, fileSize);
    }

    public MappedFile(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize, transientStorePool);
    }

    /**
     * 初始化文件名,大小
     * 设置transientStorePool
     * 设置writeBuffer,从transientStorePool借一个出来,可能为null
     */
    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();//从复用池中获取ByteBuffer,可能为null
        this.transientStorePool = transientStorePool;
    }

    /**
     * 初始化文件名,大小
     * 设置fileFromOffset代表文件对应的偏移量
     * 得到fileChannel,mappedByteBuffer 得到IO相关对象
     * 计数TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES更新
     */
    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());//fileFromOffset就是文件名称,一般为20位数字
        boolean ok = false;

        ensureDirOK(this.file.getParent());//确保父目录是存在的

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//内存映射文件MappedByteBuffer对象
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("create file channel " + this.fileName + " Failed. ", e);
            throw e;
        } catch (IOException e) {
            log.error("map file " + this.fileName + " Failed. ", e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

    //看指定目录是否存在,否则创建一个
    public static void ensureDirOK(final String dirName) {
        if (dirName != null) {
            File f = new File(dirName);
            if (!f.exists()) {
                boolean result = f.mkdirs();
                log.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
            }
        }
    }

clean相关函数

    /**
     * 清理,监视是否可用,是否已经清理过,条件适当进行清理
     * 更新内存记录以及文件数记录
     * @param currentRef
     * @return
     */
    @Override
    public boolean cleanup(final long currentRef) {
        if (this.isAvailable()) {//cleanup时,不允许该mappedFile可用
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have not shutdown, stop unmapping.");
            return false;
        }

        if (this.isCleanupOver()) {//如果已经clean完了就不用再clean了
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have cleanup, do not do it again.");
            return true;
        }

        clean(this.mappedByteBuffer);//清理mappedByteBuffer
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));//占用内存减少
        TOTAL_MAPPED_FILES.decrementAndGet();//文件数减少
        log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
        return true;
    }

    /**
     * 清理mappedByteBuffer
     * 如果是directByteBuffer且容量不为0
     * 则获取attachment转成directByteBuffer,调用其clearner.clean方法
     */
    public static void clean(final ByteBuffer buffer) {
        if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
            return;
        //下面的逻辑一定是directByteBuffer的
        /**
         * 下面这段是
         * 嵌套拿到directByteBuffer的最内部的attachment,强制转换成ByteBuffer对象(实际运行应该会是directByteBuffer)
         * 然后获得directByteBuffer的Cleaner对象 cleaner
         * 然后调用cleaner.clean方法,进行深度的释放资源
         */
        invoke(invoke(viewed(buffer), "cleaner"), "clean");
    }

    /**
     * 嵌套调用,获取最深层的attachment 或者 viewedBuffer方法
     * 转化为ByteBuffer对象
     */
    private static ByteBuffer viewed(ByteBuffer buffer) {
        String methodName = "viewedBuffer";

        Method[] methods = buffer.getClass().getMethods();
        for (int i = 0; i < methods.length; i++) {
            if (methods[i].getName().equals("attachment")) {
                methodName = "attachment";
                break;
            }
        }

        ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
        //执行DirectByteBuffer.attachment()方法
        if (viewedBuffer == null)
            return buffer;
        else
            return viewed(viewedBuffer);
    }

    //java反射,执行某个object的某方法,把所有访问级别都设置为Accessible再执行
    private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
        return AccessController.doPrivileged(new PrivilegedAction<Object>() {
            public Object run() {
                try {
                    Method method = method(target, methodName, args);
                    method.setAccessible(true);
                    return method.invoke(target);
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }
        });
    }

    //根据object,参数类型列表以及函数名得到Method对象
    private static Method method(Object target, String methodName, Class<?>[] args)
        throws NoSuchMethodException {
        try {
            return target.getClass().getMethod(methodName, args);
        } catch (NoSuchMethodException e) {
            return target.getClass().getDeclaredMethod(methodName, args);
        }
    }

    /**
     * 参数intervalForcibly代表距离上一次shutdown至少要这么长时间
     * 调用shutdown
     * 如果引用清理干净了删除文件,打log
     * 否则false
     */
    public boolean destroy(final long intervalForcibly) {
        this.shutdown(intervalForcibly);

        //如果引用清理干净了删除文件,打log
        if (this.isCleanupOver()) {
            try {
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");

                long beginTime = System.currentTimeMillis();
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeEclipseTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }

追加消息相关函数

追加MessageExt消息

    //插入MessageExtBrokerInner消息
    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }

    //插入MessageExtBatch消息
    public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
        return appendMessagesInner(messageExtBatch, cb);
    }

    /**
     * append消息,处理MessageExt消息
     * 只要文件还有剩余空间,调用AppendMessageCallback#doAppend方法完成底层append逻辑
     * 更新写的位置以及存储时间
     * 返回AppendMessageResult
     */
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {//文件还有剩余空间
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);//标记当前消息的position
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {//追加MessageExtBrokerInner消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {//追加MessageExtBatch消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());//更新写的位置
            this.storeTimestamp = result.getStoreTimestamp();//更新存储时间
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

追加byte[]内容

    /**
     * 空间足够的话,将data[]直接写入fileChannel
     * 更新wrotePosition
     */
    public boolean appendMessage(final byte[] data) {
        int currentPos = this.wrotePosition.get();

        if ((currentPos + data.length) <= this.fileSize) {//空间足够
            try {
                this.fileChannel.position(currentPos);
                this.fileChannel.write(ByteBuffer.wrap(data));
            } catch (Throwable e) {
                log.error("Error occurred when append message to mappedFile.", e);
            }
            this.wrotePosition.addAndGet(data.length);
            return true;
        }

        return false;
    }

    /**
     * 空间足够的话,将data[]的一部分写入fileChannel
     * Content of data from offset to offset + length will be wrote to file.
     *
     * @param offset The offset of the subarray to be used.
     * @param length The length of the subarray to be used.
     */
    public boolean appendMessage(final byte[] data, final int offset, final int length) {
        int currentPos = this.wrotePosition.get();

        if ((currentPos + length) <= this.fileSize) {
            try {
                this.fileChannel.position(currentPos);
                this.fileChannel.write(ByteBuffer.wrap(data, offset, length));
            } catch (Throwable e) {
                log.error("Error occurred when append message to mappedFile.", e);
            }
            this.wrotePosition.addAndGet(length);
            return true;
        }

        return false;
    }

flush 与 commit相关

    /**
     * @return The current flushed position
     */
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {//是否可以刷
            if (this.hold()) {//引用+1
                int value = getReadPosition();//获得可以读到的最大位置

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    //从缓存刷到磁盘
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);//更新刷到的位置
                this.release();//释放引用
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

    /**
     * 提交:writeBuffer写入FileChannel
     * 1.当writeBuffer为null,直接返回
     * 2.否则将writeBuffer中[lastCommittedPosition,writePos)的部分 写入fileChannel
     * 3.然后对writeBuffer归还给transientStorePool,返回committedPosition
     */
    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {//是否可以提交
            if (this.hold()) {//引用+1
                commit0(commitLeastPages);//进行提交
                this.release();//释放已用
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);//还给复用池
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }

    /**
     * commit实现,持久化到日志
     * 将writeBuffer中[lastCommittedPosition,writePos)的部分 写入fileChannel
     * 更新committedPosition
     * 参数 commitLeastPages 没用
     */
    protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);//持久化
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

    /**
     * 是否能够flush。满足如下条件任意条件:
     * 1. 文件已经写满
     * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages
     * 3. flushLeastPages = 0 && 有新写入部分
     *
     * @param flushLeastPages flush最小分页
     * @return 是否能够写入
     */
    private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();//最后有效的写到的位置

        if (this.isFull()) {
            return true;
        }

        if (flushLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }

        return write > flush;
    }

    /**
     * 是否能够commit。满足如下条件任意条件:
     * 1. 映射文件已经写满
     * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages
     * 3. commitLeastPages = 0 && 有新写入部分
     *
     * @param commitLeastPages commit的最小页数
     */
    protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }

        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }

        return write > flush;
    }

    //如果写的位置和要求文件大小一样,则写满了
    public boolean isFull() {
        return this.fileSize == this.wrotePosition.get();
    }

    /**
     * @return The max position which have valid data
     * 返回的是可以读到的最大位置(就是实际写到的位置)
     * 函数名字容易让人误解
     */
    public int getReadPosition() {
        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
    }

随机读取相关函数

    //返回从pos到 pos + size的内存映射
    public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
        int readPosition = getReadPosition();//获取当前有效数据的最大位置
        if ((pos + size) <= readPosition) {//返回的数据 必须是有效的

            if (this.hold()) {//引用+1
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//切片,复制一个byteBuffer(与原byteBuffer共享数据, 只是指针位置独立)
                byteBuffer.position(pos);
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);//设置结束位置,byteBufferNew的[0,size) 与 原有的mappedByteBuffer的[pos,pos+size)相同
                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);//绝对offset,
            } else {
                log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                    + this.fileFromOffset);
            }
        } else {
            log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
                + ", fileFromOffset: " + this.fileFromOffset);
        }

        return null;
    }

    //返回从pos到最大有效位置的所有数据
    public SelectMappedBufferResult selectMappedBuffer(int pos) {
        int readPosition = getReadPosition();
        if (pos < readPosition && pos >= 0) {//返回的数据必须是有效的
            if (this.hold()) {//引用+1
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(pos);
                int size = readPosition - pos;//剩下的都返回
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);
                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
            }
        }

        return null;
    }

其他函数

get set相关函数
warmMappedFile,用于预热mappedByteBuffer的IO的
还有mlock和munlock用LibC库的,没有深入研究

思考

flush和commit的关系

注意两个是都会执行的,先commit再flush,不会只执行一个函数

对应CommitLog里面的不同的内部类
flush对应FlushRealTimeService
commit对应CommitRealTimeService
refer中的博客是这样介绍的

image.png

我觉得上图的表格有点不准确,应该是这样

步骤1:appendMessagesInner 步骤2:commit 步骤3:flush
方式1:writeBuffer为null 写入writeBuffer writeBuffer写入channel fileChannel.force
方式2:writeBuffer非null 写入mappedByteBuffer 基本没干事情 mappedByteBuffer.force()

wrotePosition和committedPosition关系是什么

当writeBuffer为空时,就不会有真正的commit行为
此时,wrotePosition意义上就代表了committedPosition,真正的committedPosition值只会为初始值0,参考commit函数

isAbleToFlush和isAbleToCommit函数的区别

区别上,两者的int write获取方式不容

int write = getReadPosition();//前者
int write = this.wrotePosition.get();//后者

后者被调用的时候,writeBuffer不为null,因此不能调用getReadPosition,否则write和flush的值会一直一样

writeBuffer什么时候用,mappedByteBuffer什么时候用

见flush函数和appendMessagesInner函数
writeBuffer能用就用writeBuffer,否则就用mappedByteBuffer

fileSize就一定是文件大小吗

类中这个属性只是一个限制,不代表创建一个mappedFile,它立刻占有这么大空间
只不过随着数据写入,占有空间最大允许达到fileSize这个值

问题

flush函数hold失败也会设置新的flushedPosition

为什么,这和hold成功最终的返回值不就一样了吗?

init函数中,只要FileChannel或者mappetByteBuffer写入就能完成文件的创建吗

一般写法可能就是file.create()了,这里我不是很清楚IO操作,感觉上是FileChannel或者MappedByteBuffer写入就能完成文件创建

destroy函数为什么要intervalForcibly配置

这个干吗用的,为什么两次destroy之间还有相隔这么久才有效,有什么好处吗

refer:

https://github.com/YunaiV/Blog/blob/master/RocketMQ/1004-RocketMQ%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%9AMessage%E5%AD%98%E5%82%A8.md
http://blog.csdn.net/KilluaZoldyck/article/details/76775397 mappedFile作用
http://www.jianshu.com/p/f9052d9b637c mappedFile与mappedFileQueue
http://www.jianshu.com/p/6494e33c9b1f rocketmq中内存映射与零拷贝

相关文章

网友评论

      本文标题:store模块阅读8:MappedFile

      本文链接:https://www.haomeiwen.com/subject/sqazyxtx.html