美文网首页
12、Controller 如何管理集群 Broker 成员和主

12、Controller 如何管理集群 Broker 成员和主

作者: 技术灭霸 | 来源:发表于2020-10-09 11:03 被阅读0次

Controller 的两个主要功能:管理集群 Broker 成员和主题。

  • 集群成员管理:Controller 负责对集群所有成员进行有效管理,包括自动发现新增 Broker、自动处理下线 Broker,以及及时响应 Broker 数据的变更。
  • 主题管理:Controller 负责对集群上的所有主题进行高效管理,包括创建主题、变更主 题以及删除主题,等等。对于删除主题而言,实际的删除操作由底层的 TopicDeletionManager 完成。

集群成员管理

首先,我们来看 Controller 管理集群成员部分的代码。这里的成员管理包含两个方面:

  1. 成员数量的管理,主要体现在新增成员和移除现有成员;
  2. 单个成员的管理,如变更单个 Broker 的数据等。

成员数量管理

每个 Broker 在启动的时候,会在 ZooKeeper 的 /brokers/ids 节点下创建一个名为 broker.id 参数值的临时节点。

举个例子,假设 Broker 的 broker.id 参数值设置为 1001,那么,当 Broker 启动后,你会 在 ZooKeeper 的 /brokers/ids 下观测到一个名为 1001 的子节点。该节点的内容包括了 Broker 配置的主机名、端口号以及所用监听器的信息(注意:这里的监听器和上面说的 ZooKeeper 监听器不是一回事)。

当该 Broker 正常关闭或意外退出时,ZooKeeper 上对应的临时节点会自动消失。

基于这种临时节点的机制,Controller 定义了 BrokerChangeHandler 监听器,专门负责 监听 /brokers/ids 下的子节点数量变化。

一旦发现新增或删除 Broker,/brokers/ids 下的子节点数目一定会发生变化。这会被 Controller 侦测到,进而触发 BrokerChangeHandler 的处理方法,即 handleChildChange 方法。

我给出 BrokerChangeHandler 的代码。可以看到,这里面定义了 handleChildChange 方法:

class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  override val path: String = BrokerIdsZNode.path

  override def handleChildChange(): Unit = {
    eventManager.put(BrokerChange)// 仅仅是向事件队列写入BrokerChange事件
  }
}

该方法的作用就是向 Controller 事件队列写入一个 BrokerChange 事件。事实上, Controller 端定义的所有 Handler 的处理逻辑,都是向事件队列写入相应的 ControllerEvent,真正的事件处理逻辑位于 KafkaController 类的 process 方法中

接下来看process方法,你会发现,处理BrokerChange事件的方法实际上processBrokerChange,代码如下

 private def processBrokerChange(): Unit = {
    // 如果该broker不是controller,自然无权处理,直接返回
    if (!isActive) return
    // 第1步:从Zookeeper中获取集群Broker列表
    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
    val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
    val curBrokerIds = curBrokerIdAndEpochs.keySet
    // 第2步:获取Controller当前保存的Broker列表
    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
    // 第3步:比较两个列表,获取新增Broker列表、待移除Broker列表、
    // 已重启Broker列表和当前运行中的Broker列表
    val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
    val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
    val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
      .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
    val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
    val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
    val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
    val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
    val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
    val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
    info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
      s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
      s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
      s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
    // 第4步:为每个新增Broker创建与之连接的通道管理器和底层的请求发送线程(RequestSendThread)
    newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
    // 第5步:为每个已重启的Broker移除它们现有的配套资源
    //(通道管理器、RequestSendThread等),并重新添加它们
    bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
    bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
    // 第6步:为每个待移除Broker移除对应的配套资源
    deadBrokerIds.foreach(controllerChannelManager.removeBroker)
    // 第7步:为新增Broker执行更新Controller元数据和Broker启动逻辑
    if (newBrokerIds.nonEmpty) {
      controllerContext.addLiveBrokers(newBrokerAndEpochs)
      onBrokerStartup(newBrokerIdsSorted)
    }
    // 第8步:为已重启Broker执行重添加逻辑,包含
    // 更新ControllerContext、执行Broker重启动逻辑
    if (bouncedBrokerIds.nonEmpty) {
      controllerContext.removeLiveBrokers(bouncedBrokerIds)
      onBrokerFailure(bouncedBrokerIdsSorted)
      controllerContext.addLiveBrokers(bouncedBrokerAndEpochs)
      onBrokerStartup(bouncedBrokerIdsSorted)
    }
    // 第9步:为待移除Broker执行移除ControllerContext和Broker终止逻辑
    if (deadBrokerIds.nonEmpty) {
      controllerContext.removeLiveBrokers(deadBrokerIds)
      onBrokerFailure(deadBrokerIdsSorted)
    }

    if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
      info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
    }
  }

主题管理

主题创建 / 变更

Controller,或者说 Kafka 集群是如何感知到新创建的主题的呢?

这当然要归功于监听主题路径的 ZooKeeper 监听器:TopicChangeHandler。代码如下:

class TopicChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // ZooKeeper节点:/brokers/topics
  override val path: String = TopicsZNode.path
  // 向事件队列写入TopicChange事件
  override def handleChildChange(): Unit = eventManager.put(TopicChange)
}

代码中的 TopicsZNode.path 就是 ZooKeeper 下 /brokers/topics 节点。一旦该节点下 新增了主题信息,该监听器的 handleChildChange 就会被触发,Controller 通过 ControllerEventManager 对象,向事件队列写入 TopicChange 事件。

KafkaController 的 process 方法接到该事件后,调用 processTopicChange 方法执行主 题创建。代码如下:

  private def processTopicChange(): Unit = {
    if (!isActive) return
    // 第1步:从ZooKeeper中获取所有主题
    val topics = zkClient.getAllTopicsInCluster(true)
    // 第2步:与元数据缓存比对,找出新增主题列表与已删除主题列表
    val newTopics = topics -- controllerContext.allTopics
    val deletedTopics = controllerContext.allTopics.diff(topics)
    // 第3步:使用ZooKeeper中的主题列表更新元数据缓存
    controllerContext.setAllTopics(topics)
    // 第4步:为新增主题注册分区变更监听器
    // 分区变更监听器是监听主题分区变更的
    registerPartitionModificationsHandlers(newTopics.toSeq)
    // 第5步:从ZooKeeper中获取新增主题的副本分配情况
    val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
    // 第6步:清除元数据缓存中属于已删除主题的缓存项
    deletedTopics.foreach(controllerContext.removeTopic)
    // 第7步:为新增主题更新元数据缓存中的副本分配条目
    addedPartitionReplicaAssignment.foreach {
      case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
    }
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
    // 第8步:调整新增主题所有分区以及所属所有副本的运行状态为“上线”状态
    if (addedPartitionReplicaAssignment.nonEmpty)
      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
  }

主题删除

Controller 定义了 TopicDeletionHandler,用它来实现对删除主题的监听,代码如下:

class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // ZooKeeper节点:/admin/delete_topics
  override val path: String = DeleteTopicsZNode.path
  // 向事件队列写入TopicDeletion事件
  override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
}

处理 TopicDeletion 事件的方法是 processTopicDeletion,代码如下:

private def processTopicDeletion(): Unit = {
    if (!isActive) return
    // 从ZooKeeper中获取待删除主题列表
    var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
    debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
    // 找出不存在的主题列表
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
      warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
      zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
    }
    topicsToBeDeleted --= nonExistentTopics
    // 如果delete.topic.enable参数设置成true
    if (config.deleteTopicEnable) {
      if (topicsToBeDeleted.nonEmpty) {
        info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
        topicsToBeDeleted.foreach { topic =>
          val partitionReassignmentInProgress =
            controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
          if (partitionReassignmentInProgress)
            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
              reason = "topic reassignment in progress")
        }
        // 将待删除主题插入到删除等待集合交由TopicDeletionManager处理
        topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    } else {
      // 不允许删除主题
      info(s"Removing $topicsToBeDeleted since delete topic is disabled")
      // 清除ZooKeeper下/admin/delete_topics下的子节点
      zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
    }
  }

总结

相关文章

网友评论

      本文标题:12、Controller 如何管理集群 Broker 成员和主

      本文链接:https://www.haomeiwen.com/subject/zzygcktx.html