美文网首页
kafka消息的管理

kafka消息的管理

作者: matthewfly | 来源:发表于2020-11-16 15:33 被阅读0次

1.消息持久化

kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外,操作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由操作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:

# 日志保存最长时间
log.retention.hours=168
# 单个分区最大数据量限制,默认不限制
#log.retention.bytes=1073741824

2.高效传输

从文件到套接字的常见数据传输路径有4步:
1).操作系统从磁盘读取数据到内核空间的 pagecache
2).应用程序读取内核空间的数据到用户空间的缓冲区
3).应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4).操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区

kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。

3.消息格式

以前面文章中安装的kafka为例:Mac 安装kafka
kafka本地文件存储目录可以在配置文件server.properties中设置,参数及默认值为:

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs

进入该目录,可以看到kafka保存的cosumer offset和topic消息:

__consumer_offsets-45           test1-0
__consumer_offsets-19           __consumer_offsets-32           __consumer_offsets-46           test1-1
__consumer_offsets-2            __consumer_offsets-33           __consumer_offsets-47           test1-2
...

其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:

00000000000000000000.index  00000000000000000000.log    00000000000000000000.timeindex  00000000000000000003.snapshot   leader-epoch-checkpoint

可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数log.segment.bytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以.index和.log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数index.interval.bytes配置。
通过kafka命令查看00000000000000000000.index内容如下:

kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.index --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.index
offset: 0 position: 0

00000000000000000000.log内容如下:

kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.log --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1605148552244 size: 78 magic: 2 compresscodec: NONE crc: 2396879296 isvalid: true
| offset: 0 CreateTime: 1605148551995 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1
| offset: 1 CreateTime: 1605148552244 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 2
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 78 CreateTime: 1605162144502 size: 70 magic: 2 compresscodec: NONE crc: 1124495606 isvalid: true
| offset: 2 CreateTime: 1605162144502 keysize: -1 valuesize: 2 sequence: -1 headerKeys: [] payload: dd

其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000.log中的第一条消息:

offset: 0 CreateTime: 1605148551995 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1

其中payload为具体的消息内容。
另外里面还有一个以".timeindex"结尾的文件,查看其内容:

kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.timeindex --print-data-log 
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.timeindex
timestamp: 1605162144502 offset: 2

该日志文件是kafka0.10.1.1加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。

4.消息副本

由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:

partition-broker分配.jpg
其中箭头前方的为leader,被箭头指向为副本follower。这种分配方案尽量将partition均匀分布到broker集群中,consumer根据partition来消费时将访问到不同的broker,提高系统吞吐量。同时将日志平均分布存储也实现了系统的高可用。

总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。

参考:
https://kafka.apachecn.org/documentation.html#design

相关文章

  • kafka分区管理

    根据kafka消息的管理[/p/eb622febaaa3],我们知道kafka对消息进行了分区管理,并将分区尽量均...

  • kafka消息的管理

    1.消息持久化 kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采...

  • Kafka 入门1:系统架构、基本概念以及伪集群搭建方法

    一、Kafka 是什么? Apache Kafka 本质上是一种消息中间件,用来可靠传递消息事件,用来管理消息队列...

  • 基于Docker搭建分布式消息队列Kafka

    本文基于Docker搭建一套单节点的Kafka消息队列,Kafka依赖Zookeeper为其管理集群信息,虽然本例...

  • 无镜--kafka之控制器(一)

    在kafka中每个消息代理节点(broker)都管理着集群中所有分区的一部分。kafka的控制器(KafkaCon...

  • Kafka常用命令

    启动Kafka并生产消费消息 启动ZooKeeper 启动Kafka 查看启动后kafka的版本 生产者发送消息 ...

  • Kafka/RocketMQ顺序消息对比

    一、Kafka顺序消息 Producer端:Kafka的顺序消息是通过partition key,将某类消息(例如...

  • Kafka实践

    kafka基本概念: Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为...

  • Kafka的基本原理及集群架构

    Kafka简介 Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Pro...

  • Kafka总结

    Kafka 体系架构 1. Kafka的用途有哪些?使用场景如何? 消息系统: Kafka 和传统的消息系统(也称...

网友评论

      本文标题:kafka消息的管理

      本文链接:https://www.haomeiwen.com/subject/gpeabktx.html