1.消息持久化
kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外,操作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由操作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
# 日志保存最长时间
log.retention.hours=168
# 单个分区最大数据量限制,默认不限制
#log.retention.bytes=1073741824
2.高效传输
从文件到套接字的常见数据传输路径有4步:
1).操作系统从磁盘读取数据到内核空间的 pagecache
2).应用程序读取内核空间的数据到用户空间的缓冲区
3).应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4).操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
3.消息格式
以前面文章中安装的kafka为例:Mac 安装kafka
kafka本地文件存储目录可以在配置文件server.properties中设置,参数及默认值为:
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
__consumer_offsets-45 test1-0
__consumer_offsets-19 __consumer_offsets-32 __consumer_offsets-46 test1-1
__consumer_offsets-2 __consumer_offsets-33 __consumer_offsets-47 test1-2
...
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex 00000000000000000003.snapshot leader-epoch-checkpoint
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数log.segment.bytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以.index和.log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数index.interval.bytes配置。
通过kafka命令查看00000000000000000000.index内容如下:
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.index --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.index
offset: 0 position: 0
00000000000000000000.log内容如下:
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.log --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1605148552244 size: 78 magic: 2 compresscodec: NONE crc: 2396879296 isvalid: true
| offset: 0 CreateTime: 1605148551995 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1
| offset: 1 CreateTime: 1605148552244 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 2
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 78 CreateTime: 1605162144502 size: 70 magic: 2 compresscodec: NONE crc: 1124495606 isvalid: true
| offset: 2 CreateTime: 1605162144502 keysize: -1 valuesize: 2 sequence: -1 headerKeys: [] payload: dd
其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000.log中的第一条消息:
offset: 0 CreateTime: 1605148551995 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1
其中payload为具体的消息内容。
另外里面还有一个以".timeindex"结尾的文件,查看其内容:
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.timeindex --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test1-0/00000000000000000000.timeindex
timestamp: 1605162144502 offset: 2
该日志文件是kafka0.10.1.1加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。
4.消息副本
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:

其中箭头前方的为leader,被箭头指向为副本follower。这种分配方案尽量将partition均匀分布到broker集群中,consumer根据partition来消费时将访问到不同的broker,提高系统吞吐量。同时将日志平均分布存储也实现了系统的高可用。
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
网友评论