美文网首页
Kafka源码分析-Server-日志存储(4)-OffsetI

Kafka源码分析-Server-日志存储(4)-OffsetI

作者: 陈阳001 | 来源:发表于2019-03-08 14:23 被阅读0次

为了提升查找消息的性能,从Kafka0.8开始,为每个日志文件添加了对应的索引文件。OffsetIndex对象对应磁盘管理上的一个索引文件,与上节分析的FileMessageSet共同构成一个LogSegment对象。
1.首先介绍索引文件中索引项的格式:每个索引项有8个字节,分为两部分,第一部分是相对offset,占4个字节;第二部分是物理地址,就是其索引消息在日志文件中对应的position位置,占4个字节。这样就实现了offset与物理地址直接的映射。相对offset表示的是消息相对于baseOffset的偏移量。例如,分段后的一个日志文件的baseOffset是20,当然它的文件名就是20.log,那么offset为23的Message在索引文件中的相对offset就是23-20=3。消息的offset是Long类型,4个字节可能无法直接存储消息的offset,所以使用相对的offset,这样可以减少索引文件占用的空间。
Kafka使用稀疏索引的方式构造消息的索引,它不保证每个消息在索引文件中都有对应的索引项,这算是磁盘空间,内存空间,查找时间等多方面的折中。不断减少索引文件大小的目的是为了将索引文件映射到内存,在OffsetIndex中会使用MappedByteBuffer将索引文件映射到内存中。
介绍完索引文件的相关概念后,我们来介绍下OffsetIndex字段。

  • _file:指向磁盘上的索引文件。
  • baseOffset:对应日志文件中第一个消息的offset。
  • mmap:用来操作索引文件的MappedByteBuffer。
  • lock:ReentrantLock对象,在对mmap进行操作时,需要加锁保护。
  • _entries:当前索引文件中的索引项个数。
  • _maxEntries:当前索引文件中最多能够保存的索引项个数。
  • _lastOffset:保存最后一个索引项的offset。
    在OffsetIndex初始化的过程中会初始化上述字段,因为会有多个Handler线程并发写入索引文件,所以这些字段使用@volatile修饰,保证线程之间的可见性。初始化代码如下:
 /* initialize the memory mapping for this index */
  @volatile
  private[this] var mmap: MappedByteBuffer = {
    //如果索引文件不存在,则创建新文件并返回true,反之返回false。
    val newlyCreated = _file.createNewFile()
    val raf = new RandomAccessFile(_file, "rw")
    try {
      /* pre-allocate the file if necessary */
      if (newlyCreated) {//对于新创建的的索引文件,进行扩容
        if (maxIndexSize < 8)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        //根据maxIndexSize的值对索引文件进行扩容,扩容结果是小于maxIndexSize的最大的8的倍数
        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
      }

      /* memory-map the file 进行内存映射 */
      val len = raf.length()
      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

      /* set the position in the index for the next entry
      * 将新创建的索引文件的positon设置为0,从头开始写文件。
      * */
      if (newlyCreated)
        idx.position(0)
      else
        // if this is a pre-existing index, assume it is all valid and set position to last entry
        //  对于原来就存在的索引文件,则将position移动到所有索引项的结束位置,防止数据覆盖
        idx.position(roundToExactMultiple(idx.limit, 8))
      idx    //    返回MappedByteBuffer
    } finally {
      CoreUtils.swallow(raf.close())
    }
  }

OffsetIndex提供了向索引文件中添加索引项的append()方法,将索引文件截断到某个位置的truncateTo()方法和truncateToEntries()方法,进行文件扩容的resize()方法。这些方法实际上都是通过mmap字段的相关操作完成的。
OffsetIndex中最常用的还是查找相关的方法,使用的是二分查找,涉及的方法是indexSlotFor和lookup()。值得注意的地方是,查找的目标小于targetOffset的最大offset对应的物理地址(position)。下面是lookup()方法的代码:

/**
   * Find the largest offset less than or equal to the given targetOffset 
   * and return a pair holding this offset and its corresponding physical file position.
   * 
   * @param targetOffset The offset to look up.
   * 
   * @return The offset found and the corresponding file position for this offset. 
   * If the target offset is smaller than the least entry in the index (or the index is empty),
   * the pair (baseOffset, 0) is returned.
   */
  def lookup(targetOffset: Long): OffsetPosition = {
    maybeLock(lock) {//window操作要加锁,其他操作不加做
      val idx = mmap.duplicate//创建一个副本
      val slot = indexSlotFor(idx, targetOffset)//二分查找的具体实现
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else//将offset和物理地址(position)封装成OffsetPosition对象并返回
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
      //relativeOffset()方法和physical()方法是获取索引项内容的辅助方法,分别实现了
      // 读取索引项中的相对offset和索引项中的物理地址(position)的功能
      }
  }

/**
   * Find the slot in which the largest offset less than or equal to the given
   * target offset is stored.
   * 
   * @param idx The index buffer
   * @param targetOffset The offset to look for
   * 
   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
   */
  private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
    // we only store the difference from the base offset so calculate that
    val relOffset = targetOffset - baseOffset
    
    // check if the index is empty
    if (_entries == 0)
      return -1
    
    // check if the target offset is smaller than the least offset
    if (relativeOffset(idx, 0) > relOffset)
      return -1
      
    // binary search for the entry  标准的二分查找法
    var lo = 0
    var hi = _entries - 1
    while (lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = relativeOffset(idx, mid)
      if (found == relOffset)
        return mid
      else if (found < relOffset)
        lo = mid
      else
        hi = mid - 1
    }
    lo//如果找不到targetOffset对应的索引项,则返回小于targetOffset的最大的索引项位置
  }

相关文章

网友评论

      本文标题:Kafka源码分析-Server-日志存储(4)-OffsetI

      本文链接:https://www.haomeiwen.com/subject/tasrpqtx.html