美文网首页
Broker消息存储篇

Broker消息存储篇

作者: DH大黄 | 来源:发表于2021-12-19 19:18 被阅读0次

Broker是如何存储消息的

流程图

RocketMQ存储消息的主要流程图如下

Broker存储消息流程

然后我们从上往下细说下里面的流程

写入CommitLog

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 延时等级 > 0,替换原有的消费主题为 SCHEDULE_TOPIC_XXXX,队列id 为 延时的等级-1
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }

        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;

        // 申请 putMessageLock 锁 (将消息存储到CommitLog文件中是串行的)
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            // 获取当前可以写入的CommitLog文件
            // CommitLog 存储地址:${ROCKET_HOME}/store/commitlog 文件默认大小1g。一个文件写满后就再创建另一个文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            //  ${ROCKET_HOME}/store/commitlog 目录下没有任何文件
            if (null == mappedFile || mappedFile.isFull()) {
                // 以偏移量为0 创建commitLog文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            // 处理完追加逻辑就会释放锁
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

        // 刷盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // 此处会处理主从同步的结果(HA)
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });
    }

从代码中,我们可以看到写入CommitLog这个操作主要做了以下几件事情

  1. 设置存储的消息的基本信息
    1. 如果消息是延时消息,会将原有的消息topic替换为 SCHEDULE_TOPIC_XXXX,队列id为延迟等级
  2. 申请putMessageLock(该操作确保了只会有一个线程去对CommitLog进行修改)
  3. 如果CommitLog不存在或者已经写满,需要创建新的CommitLog
  4. 往MappedFile中追加消息(追加消息的时候才会生成消息唯一ID)
    1. 如果文件剩余空间不足,会创建新的文件 (消息长度 + END_FILE_MIN_BLANK_LENGTH > CommitLog文件空闲空间,返回END_OF_FILE)
    2. 其他的一些异常
  5. 追加完消息,释放锁

讲完这些,想必大家心中肯定会有一些疑惑。比如说CommitLog和MappedFile是什么关系,为什么需要这个,CommitLog长什么样的,里面有什么东西。等等问题

下面我来给大家讲一讲我在看这个的时候产生的一些疑问

CommitLog里面具体有什么

CommitLog里面具体有什么

CommitLog里面记录了消息的完整内容,我们在读取消息的时候,首先先通过前4个字节记录了当前消息的实际长度,然后再往后读对应的长度,就可以将消息完全读取出来

CommitLog文件在磁盘中的存储路径 ${ROCKET_HOME}/store/commitlog/

CommitLog和MappedFile的关系

RocketMQ采用内存映射文件的方式来提高IO访问性能,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设置为固定长度。然后RocketMQ使用MappedFile和MappedFileQueue来封装存储文件。具体对MappedFile和MappedFileQueue的概念我会在后面在具体介绍

CommitLog与ConsumeQueue,IndexFile的关系

既然要探讨ConsumeQueue和IndexFile与CommitLog之间的关系,不如直接来讲一讲这两个文件是干什么用的,从他们的用处中我们也就能够得知为什么在已经有了CommitLog的情况下,RocketMQ还需要ConsumeQueue和IndexFile这两种文件

先说说ConsumeQueue

首先,我们来看看ConsumeQueue的结构

ConsumeQueue文件所在目录 ConsumeQueue文件中单元的格式

从图片中,我们不难看出,RocketMQ对ConsumeQueue的层级目录为

consumequeue - 具体的Topic - 具体的某一个队列 - ConsumeQueue的单元 (记录了消息在CommitLog中的偏移量,消息的长度,消息的tags)

那么为什么RocketMQ要设计这样的映射关系呢?主要要从两个方面来考虑,同时这两个方面最终的目的也是为了保证RocketMQ的性能

  • 写入消息:如果我们没有CommitLog,直接就用一个ConsumeQueue,那么在Producer生产消息给Broker的时候,不同的Topic和队列的消息我们要写到不同的ConsumeQueue文件中。那么就会存在随机写的问题,这样写入消息的效率就会变的非常的低
  • 读取消息:如果为了保证消息的写入效率,我们将消息存放在CommitLog中,但是,在消费者拉取消息的时候,它肯定是要拉取自己感兴趣的Topic的消息。此时,我们去CommitLog中寻找对应的Topic和对应MessageQueue下对应偏移量的消息,挨个去遍历CommitLog一看就是一个效率非常低的操作。此时我们使用ConsumeQueue就可以快速定位到消息处于哪个CommitLog,他对应的偏移量是多少。这样读取消息的效率就会变得非常的高

这边补充一点,关于ConsumeQueue中的tag hashcode。

在RocketMQ中,我们不仅可以监听我们感兴趣的Topic消息,同时我们还可以只监听Topic下部分的消息。这个操作就是通过消息的tag来实现的(一种实现方式,另外还有类过滤和SQL92过滤的方式)

在消费者拉取消息的时候,不仅消费者自己会对消息进行过滤,Broker也会根据消费者的情况对消息进行一次预过滤。此处我们就不展开讲述,具体可以看我的消息消费文章

IndexFile

IndexFile的结构

IndexFile的结构

他的目的是为了方便我们根据消息的key快速检索消息

他的快速检索主要是依靠文件头部的Hash槽。

哈希冲突的解决

既然说到了hash,那么就一定会存在哈希冲突的情况。那么,IndexFile是如何解决哈希冲突的呢?

在我们写IndexFile的时候,假如IndexFile还有足够的空间,那么我们就会对当前这条消息的消息key进行一次哈希计算,假如对应的hash槽中没有记录对应的index条目偏移量,那么直接将偏移量记录到hash槽中。否则这是在Index条目中的pre index no记录原先的偏移量,然后再将这个条目的偏移量记录到hash槽中

MappedFile和MappedFileQueue

MappedFile

为什么需要MappedFile?

RocketMQ底层使用 mmap + write (零拷贝)的方式减少用户态和内存态的切换次数

具体的可以看看这篇文章

mmap

这里只简单介绍一些:

加入没有mmap,我们读取文件需要经历两次数据拷贝,一次是从磁盘拷贝到page cache中,一次从page cache拷贝到用户空间内存。

而有了mmap,就能够减少page cache到用户空间内存的拷贝,mmap对page cache和用户空间虚拟地址进行了直接的映射,操作虚拟地址就等同于操作page cache

RocketMQ会定时将已经消费的消息从存储文件中删除,以CommitLog为例,第一个CommitLog文件的偏移量不一定 00000000000000000000

MappedFileQueue

MappedFileQueue是MappedFile的管理容器,它是对存储目录的封装

比如说在CommitLog文件存储目录下有好多个CommitLog文件,对应在内存中也会有好多个MappedFile对象,而MappedFileQueue就是来管理这些MappedFile对象的

MappedFile刷盘操作

消息刷盘有两种:

  • 同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  • 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

相关文章

网友评论

      本文标题:Broker消息存储篇

      本文链接:https://www.haomeiwen.com/subject/dpmofrtx.html