rocketMq-broker消息存储介绍

作者: 晴天哥_王志 | 来源:发表于2018-03-21 20:44 被阅读163次

broker的消息存储做了那些事

    rocketMq的broker消息存储主要包括3个部分,分别commitLog的存储,consumeQueue的存储,index的存储,这章分享会把这三个过程分解清楚,同时会对里面涉及的存储位置的偏移量着重讲解清楚。

    1、commitLog的存储是producer发送消息给broker端broker同步处理的

    2、consumeQueue和index两者存储其实是一个定时任务从commitLog中获取偏移量然后存储过去的

    3、consumeQueue和index的存储 与 commitLog的存储是隔离开的,非同步的

broker的消息存储过程

消息存储过程

说明:分享自再说rocketmq消息存储

    1、commitLog其实有两层够层,其中MappendFileQueue是逻辑的存储队列概念,里面保存着顺序增长的MappedFile文件。

    2、MappedFile文件是真正存储实际数据的文件

    3、在整个broker的存储体系中,MappedFile文件保存了commitLog、consumeQueue、Index等,是核心的数据结构。

broker的消息存储过程

注册消息消费函数

说明:参见BrokerController类

    1、其中sendProcessor就是broker端接收message的入口函数

处理发送消息

说明:参见SendMessageProcessor类

    1、这里我们先看下单个消息的处理过程,也就是sendMessage过程

组装消息并开始存储

说明:参见SendMessageProcessor类

    1、putMessage函数开始执行数据的保存

commitLog消息存储

说明:参见DefaultMessageStore类

    1、commitLog.putMessage开始进入commitLog的保存逻辑

获取mappedFile文件并存储消息

说明:参见CommitLog类

    1、先从mappedFileQueue获取最后一个MappedFile文件,如果为空就创建一个commitLog对应的MappedFile文件,文件命名以实际以文件大小命名,分别是00000000000000000000、00000000001073741824、00000000002147483648,文件名之间差1G=1024*1024*1224B=1073741824。每个commitLog的MappedFile文件大小是1G,剩余多余无非存入新消息就用填充字符填充到1G。

    2、mappedFile.appendMessage执行保存消息到MappedFile的动作

通过回调函数来执行message的保存

说明:参见MappedFile类

    1、这里我们以单个消息存储为例继续说明

计算存储位置及计算consumeQueue偏移

说明:参见CommitLog类

    1、做一些前置准备,包括计算下一个存储位置等

判断剩余空间

说明:参见CommitLog类

    1、这里有个特别的地方需要注意,就是如果剩余空间无法装下消息+8个字节的结束标识符,就默认结束了,结束标识符应该用于标识mappedFile是否结束。

按照格式写入缓存

说明:参见CommitLog类

    1、这里的消息写入过程我们分析暂时到写入缓存就结束了,真正实际上是还会有刷盘的动作的,这里暂时不展开分析,后续单独开一章刷盘的的分析。

消息存储格式

说明:

    commitLog的MappedFile文件中保存的数据格式如上图所示,在末尾不能保存整个消息的时候就会重新生成一个MappedFile文件,当然在末尾应该会有填充结束标识符,文件结束符是以特殊magic_code结束的,为BLANK_MAGIC_CODE=0xBBCCDDEE ^1880681586 +8;

comsumeQueue的存储过程

consumeQueue的保存过程

说明:参见DefaultMessageStore类

    1、该实现类是一个线程函数,内部通过run操作循环去commitLog去消息位移信息保存到consumeQueue当中

循环拉取commitLog消息并生成dispatch消息

说明:参见DefaultMessageStore类

dispatchList保存consumeQueue和index

说明:参见DefaultMessageStore类

    1、this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());

     2、this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

consumeQueue保存操作 consumeQueue保存逻辑

说明:参见ConsumeQueue类

    1、ConsumeQueue的消息单元格式如下图。

ConsumeQueue的消息格式

相关文章

  • rocketMq-broker消息存储介绍

    broker的消息存储做了那些事 rocketMq的broker消息存储主要包括3个部分,分别commitLog的...

  • rocketMq-broker介绍

    broker在整个rocketMq系统中是一个很重要的消息存储模块,保存了以下所有信息。 1、所有的消息都保存...

  • RocketMQ源码阅读(四)-消息存储三

    前文已经介绍了消息存储中使用到的充要对象, 本文分析一下消息介绍的主流程. 另外, 此篇主要分析消息存储主流程的代...

  • RocketMQ源码阅读(四)-消息存储二

    RocketMQ的消息存储过程非常复杂, 本文先介绍存储模块中几个重要对象. 1. MappedFile 对Map...

  • RocketMQ:消息存储机制详解与源码解析

    文章目录 消息存储机制 1.前言⒉.核心存储类:DefaultMessageStore3.消息存储流程4.消息存储...

  • Python中的 redis keyspace 通知

    介绍 Redis是内存中的数据结构存储,用于缓存、高速数据摄取、处理消息队列、分布式锁定等等。 与其他内存存储相比...

  • RocketMQ-Broker

    Broker在RocketMQ中承担着消息存储的功能。(待补充)

  • RocketMQ源码阅读(四)-消息存储

    前言 接下来会介绍RocketMQ的消息存储, 本文先对RocketMQ的整体设计和组件进行简单介绍,后续会针对细...

  • RocketMq消息存储

    一个消息中间件最核心的东西就是消息存储结构。 这是kafka的消息存储: 每个topic_partition对应一...

  • Cat消息存储

    消息格式为 应用名-IP-小时正点数-消息递增号 MessageId 每个 应用 + IP + 整点小时 对应...

网友评论

    本文标题:rocketMq-broker消息存储介绍

    本文链接:https://www.haomeiwen.com/subject/mlpyqftx.html