美文网首页
07、KafkaRequestHandler - 请求处理全流程

07、KafkaRequestHandler - 请求处理全流程

作者: 技术灭霸 | 来源:发表于2020-09-16 09:04 被阅读0次

KafkaRequestHandlerPool 是真正处理 Kafka 请求的地方。切记,Kafka 中处理请求的 类不是 SocketServer,也不是 RequestChannel,而是 KafkaRequestHandlerPool。


image.png
  • KafkaRequestHandler:请求处理线程类。每个请求处理线程实例,负责从 SocketServer 的 RequestChannel 的请求队列中获取请求对象,并进行处理。
  • KafkaRequestHandlerPool:请求处理线程池,负责创建、维护、管理和销毁下辖的 请求处理线程。
  • BrokerTopicMetrics:Broker 端与主题相关的监控指标的管理类。 - BrokerTopicStats(C):定义 Broker 端与主题相关的监控指标的管理操作。
  • BrokerTopicStats(O):BrokerTopicStats 的伴生对象类,定义 Broker 端与主题相 关的监控指标,比如常见的 MessagesInPerSec 和 MessagesOutPerSec 等。

KafkaRequestHandler

class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis,
                          time: Time) extends Runnable with Logging {

从定义可知,KafkaRequestHandler 是一个 Runnable 对象,因此,你可以把它当成是一 个线程。每个 KafkaRequestHandler 实例,都有 4 个关键的属性。

  • id:请求处理线程的序号,类似于 Processor 线程的 ID 序号,仅仅用于标识这是线程 池中的第几个线程。
  • brokerId:Broker 序号,用于标识这是哪个 Broker 上的请求处理线程。
  • requestChannel:SocketServer 中的请求通道对象。它是负责处理请求的类,请求是保存在 RequestChannel 中的请求队列中,因此,Kafka 在构 造 KafkaRequestHandler 实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,也就是说,要让 I/O 线程能够找到请求被保存的地方。
  • apis:这是一个 KafkaApis 类。如果说 KafkaRequestHandler 是真正处理请求的,那么,KafkaApis 类就是真正执行请求处理逻辑的地方。

run()方法

  def run(): Unit = {
    // 只要该线程尚未关闭,循环运行处理逻辑
    while (!stopped) {

      val startSelectTime = time.nanoseconds
      // 从请求队列中获取下一个待处理的请求
      val req = requestChannel.receiveRequest(300)
      val endTime = time.nanoseconds
      // 统计线程空闲时间
      val idleTime = endTime - startSelectTime
      // 更新线程空闲百分比指标
      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

      req match {
        // 关闭线程请求
        case RequestChannel.ShutdownRequest =>
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
          // 关闭线程
          shutdownComplete.countDown()
          return
        // 普通请求
        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
            // 由KafkaApis.handle方法执行相应处理逻辑
            apis.handle(request)
          } catch {
            // 如果出现严重错误,立即关闭线程
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            // 如果是普通异常,记录错误日志
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            // 释放请求对象占用的内存缓冲区资源
            request.releaseBuffer()
          }

        case null => // 继续
      }
    }
    shutdownComplete.countDown()
  }

run 方法的主要运行逻辑。它的所有执行逻辑都在 while 循环之下,因此,只 要标志线程关闭状态的 stopped 为 false,run 方法将一直循环执行 while 下的语句。

第 1 步是从请求队列中获取下一个待处理的请求,同时更新一些相关的统计指标。如 果本次循环没取到,那么本轮循环结束,进入到下一轮。如果是 ShutdownRequest 请 求,则说明该 Broker 发起了关闭操作。

而 Broker 关闭时会调用 KafkaRequestHandler 的 shutdown 方法,进而调用 initiateShutdown 方法,以及 RequestChannel 的 sendShutdownRequest 方法,而后 者就是将 ShutdownRequest 写入到请求队列。

一旦从请求队列中获取到 ShutdownRequest,run 方法代码会调用 shutdownComplete 的 countDown 方法,正式完成对 KafkaRequestHandler 线程的关闭操作。

  def shutdown(): Unit = synchronized {
    info("shutting down")
    for (handler <- runnables)
      handler.initiateShutdown()// 调用initiateShutdown方法发起关闭
    for (handler <- runnables)
      // 调用awaitShutdown方法等待关闭完成
     // run方法一旦调用countDown方法,这里将解除等待状态
      handler.awaitShutdown()
    info("shut down completely")
  }

就像代码注释中写的那样,一旦 run 方法执行了 countDown 方法,程序流解除在 awaitShutdown 方法这里的等待,从而完成整个线程的关闭操作。

KafkaRequestHandlerPool

重点学习KafkaRequestHandlerPool是如何创建这些线程的,以 及创建它们的时机。

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

KafkaRequestHandlerPool 对象定义了 7 个属性,其中比较关键的有 4 个,我分别来解 释下。

  • brokerId:和 KafkaRequestHandler 中的一样,保存 Broker 的序号。
  • requestChannel:SocketServer 的请求处理通道,它下辖的请求队列为所有 I/O 线程
    所共享。requestChannel 字段也是 KafkaRequestHandler 类的一个重要属性。
  • apis:KafkaApis 实例,执行实际的请求处理逻辑。它同时也是 KafkaRequestHandler 类的一个重要属性。
  • numThreads:线程池中的初始线程数量。它是 Broker 端参数 num.io.threads 的值。 目前,Kafka 支持动态修改 I/O 线程池的大小,因此,这里的 numThreads 是初始线程 数,调整后的 I/O 线程池的实际大小可以和 numThreads 不一致。
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, )

 controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, )

Data plane 所属的 KafkaRequestHandlerPool 线程池的初始数量,就是 Broker 端的参数 nums.io.threads,即这里的 config.numIoThreads 值;当你想要在一开始就提升 Broker 端请求处理 能力的时候,不妨试着增加这个参数值。

而用于 Control plane 的线程池的数量,则硬编码为 1

createHandler 方法

当线程池初始化时,Kafka 使用下面这段代码批量创建线程,并将它们添加到线程池中:

  for (i <- 0 until numThreads) {
    createHandler(i) // 创建numThreads个I/O线程
  }
  // 创建序号为指定id的I/O线程对象,并启动该线程
  def createHandler(id: Int): Unit = synchronized {
    // 创建KafkaRequestHandler实例并加入到runnables中
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    // 启动KafkaRequestHandler线程
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }

createHandler 方法的主体逻辑分为三步:

  1. 创建 KafkaRequestHandler 实例;
  2. 将创建的线程实例加入到线程池数组;
  3. 启动该线程。

resizeThreadPool 方法

这个方法的目的是,把 I/O 线程池的线程数重设为指定的数值。代码如下:

  def resizeThreadPool(newSize: Int): Unit = synchronized {
    val currentSize = threadPoolSize.get
    info(s"Resizing request handler thread pool size from $currentSize to $newSize")
    if (newSize > currentSize) {
      for (i <- currentSize until newSize) {
        createHandler(i)
      }
    } else if (newSize < currentSize) {
      for (i <- 1 to (currentSize - newSize)) {
        runnables.remove(currentSize - i).stop()
      }
    }
    threadPoolSize.set(newSize)
  }

该方法首先获取当前线程数量。如果目标数量比当前数量大,就利用刚才说到的 createHandler 方法将线程数补齐到目标值 newSize;否则的话,就将多余的线程从线程 池中移除,并停止它们。最后,把标识线程数量的变量 threadPoolSize 的值调整为目标值 newSize。

全处理流程

第 1 步:Clients 或其他 Broker 发送请求给 Acceptor 线程

第 2 & 3 步:Processor 线程处理请求,并放入请求队列

第 4 步:I/O 线程处理请求

第 5 步:KafkaRequestHandler 线程将 Response 放入 Processor 线 程的 Response 队列

第 6 步:Processor 线程发送 Response 给 Request 发送方

相关文章

  • 07、KafkaRequestHandler - 请求处理全流程

    KafkaRequestHandlerPool 是真正处理 Kafka 请求的地方。切记,Kafka 中处理请求的...

  • Kafka处理请求的全流程解析

    大家好,我是 yes。 这是我的第三篇Kafka源码分析文章,前两篇讲了日志段的读写和二分算法在kafka索引上的...

  • 【充电】《Nginx核心知识100讲》信号、nginx事件、同步

    极客专栏《Nginx核心知识100讲》20-32小节的笔记 nginx 请求处理流程 1.nginx请求处理流程 ...

  • Future模式

    Future模式 概念 处理流程 传统处理流程 客户端发出call请求,这个请求需要很长一段时间才能返回。客户端一...

  • nginx架构基础

    nginx 请求处理流程 nginx进程结构 nginx 进程管理:信号 reload流程 热升级流程 work进...

  • MySQL查询执行过程

    MYSQL请求处理路径: MySQL请求处理流程: #1.为用户创建、分配处理线程。 #2.登录验证 #3.资源初...

  • 探索 OkHttp 原理

    前言 1. OkHttp 请求处理流程概述 当我们发起同步请求时,请求会被 Dispatcher 放到同步请求队列...

  • Dubbo请求处理流程

    Dubbo客户端发送请求调用栈: Dubbo服务端接受请求图: 介绍下两个概念: 过滤器链 各类协议protoco...

  • Nginx请求处理流程

    官方文档地址:http://nginx.org/en/docs/http/request_processing.h...

  • redis请求处理流程

    1, 编译:redis源码是基于makefile构建的,在ide里调试很麻烦,不能符号跳转,所以就根据makefi...

网友评论

      本文标题:07、KafkaRequestHandler - 请求处理全流程

      本文链接:https://www.haomeiwen.com/subject/ktpecktx.html