Controller 的两个主要功能:管理集群 Broker 成员和主题。
- 集群成员管理:Controller 负责对集群所有成员进行有效管理,包括自动发现新增 Broker、自动处理下线 Broker,以及及时响应 Broker 数据的变更。
- 主题管理:Controller 负责对集群上的所有主题进行高效管理,包括创建主题、变更主 题以及删除主题,等等。对于删除主题而言,实际的删除操作由底层的 TopicDeletionManager 完成。
集群成员管理
首先,我们来看 Controller 管理集群成员部分的代码。这里的成员管理包含两个方面:
- 成员数量的管理,主要体现在新增成员和移除现有成员;
- 单个成员的管理,如变更单个 Broker 的数据等。
成员数量管理
每个 Broker 在启动的时候,会在 ZooKeeper 的 /brokers/ids 节点下创建一个名为 broker.id 参数值的临时节点。
举个例子,假设 Broker 的 broker.id 参数值设置为 1001,那么,当 Broker 启动后,你会 在 ZooKeeper 的 /brokers/ids 下观测到一个名为 1001 的子节点。该节点的内容包括了 Broker 配置的主机名、端口号以及所用监听器的信息(注意:这里的监听器和上面说的 ZooKeeper 监听器不是一回事)。
当该 Broker 正常关闭或意外退出时,ZooKeeper 上对应的临时节点会自动消失。
基于这种临时节点的机制,Controller 定义了 BrokerChangeHandler 监听器,专门负责 监听 /brokers/ids 下的子节点数量变化。
一旦发现新增或删除 Broker,/brokers/ids 下的子节点数目一定会发生变化。这会被 Controller 侦测到,进而触发 BrokerChangeHandler 的处理方法,即 handleChildChange 方法。
我给出 BrokerChangeHandler 的代码。可以看到,这里面定义了 handleChildChange 方法:
class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = BrokerIdsZNode.path
override def handleChildChange(): Unit = {
eventManager.put(BrokerChange)// 仅仅是向事件队列写入BrokerChange事件
}
}
该方法的作用就是向 Controller 事件队列写入一个 BrokerChange 事件。事实上, Controller 端定义的所有 Handler 的处理逻辑,都是向事件队列写入相应的 ControllerEvent,真正的事件处理逻辑位于 KafkaController 类的 process 方法中。
接下来看process方法,你会发现,处理BrokerChange事件的方法实际上processBrokerChange,代码如下
private def processBrokerChange(): Unit = {
// 如果该broker不是controller,自然无权处理,直接返回
if (!isActive) return
// 第1步:从Zookeeper中获取集群Broker列表
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
val curBrokerIds = curBrokerIdAndEpochs.keySet
// 第2步:获取Controller当前保存的Broker列表
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
// 第3步:比较两个列表,获取新增Broker列表、待移除Broker列表、
// 已重启Broker列表和当前运行中的Broker列表
val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
.filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
// 第4步:为每个新增Broker创建与之连接的通道管理器和底层的请求发送线程(RequestSendThread)
newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
// 第5步:为每个已重启的Broker移除它们现有的配套资源
//(通道管理器、RequestSendThread等),并重新添加它们
bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
// 第6步:为每个待移除Broker移除对应的配套资源
deadBrokerIds.foreach(controllerChannelManager.removeBroker)
// 第7步:为新增Broker执行更新Controller元数据和Broker启动逻辑
if (newBrokerIds.nonEmpty) {
controllerContext.addLiveBrokers(newBrokerAndEpochs)
onBrokerStartup(newBrokerIdsSorted)
}
// 第8步:为已重启Broker执行重添加逻辑,包含
// 更新ControllerContext、执行Broker重启动逻辑
if (bouncedBrokerIds.nonEmpty) {
controllerContext.removeLiveBrokers(bouncedBrokerIds)
onBrokerFailure(bouncedBrokerIdsSorted)
controllerContext.addLiveBrokers(bouncedBrokerAndEpochs)
onBrokerStartup(bouncedBrokerIdsSorted)
}
// 第9步:为待移除Broker执行移除ControllerContext和Broker终止逻辑
if (deadBrokerIds.nonEmpty) {
controllerContext.removeLiveBrokers(deadBrokerIds)
onBrokerFailure(deadBrokerIdsSorted)
}
if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
}
}
主题管理
主题创建 / 变更
Controller,或者说 Kafka 集群是如何感知到新创建的主题的呢?
这当然要归功于监听主题路径的 ZooKeeper 监听器:TopicChangeHandler。代码如下:
class TopicChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
// ZooKeeper节点:/brokers/topics
override val path: String = TopicsZNode.path
// 向事件队列写入TopicChange事件
override def handleChildChange(): Unit = eventManager.put(TopicChange)
}
代码中的 TopicsZNode.path 就是 ZooKeeper 下 /brokers/topics 节点。一旦该节点下 新增了主题信息,该监听器的 handleChildChange 就会被触发,Controller 通过 ControllerEventManager 对象,向事件队列写入 TopicChange 事件。
KafkaController 的 process 方法接到该事件后,调用 processTopicChange 方法执行主 题创建。代码如下:
private def processTopicChange(): Unit = {
if (!isActive) return
// 第1步:从ZooKeeper中获取所有主题
val topics = zkClient.getAllTopicsInCluster(true)
// 第2步:与元数据缓存比对,找出新增主题列表与已删除主题列表
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics.diff(topics)
// 第3步:使用ZooKeeper中的主题列表更新元数据缓存
controllerContext.setAllTopics(topics)
// 第4步:为新增主题注册分区变更监听器
// 分区变更监听器是监听主题分区变更的
registerPartitionModificationsHandlers(newTopics.toSeq)
// 第5步:从ZooKeeper中获取新增主题的副本分配情况
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
// 第6步:清除元数据缓存中属于已删除主题的缓存项
deletedTopics.foreach(controllerContext.removeTopic)
// 第7步:为新增主题更新元数据缓存中的副本分配条目
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
// 第8步:调整新增主题所有分区以及所属所有副本的运行状态为“上线”状态
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
主题删除
Controller 定义了 TopicDeletionHandler,用它来实现对删除主题的监听,代码如下:
class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
// ZooKeeper节点:/admin/delete_topics
override val path: String = DeleteTopicsZNode.path
// 向事件队列写入TopicDeletion事件
override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
}
处理 TopicDeletion 事件的方法是 processTopicDeletion,代码如下:
private def processTopicDeletion(): Unit = {
if (!isActive) return
// 从ZooKeeper中获取待删除主题列表
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
// 找出不存在的主题列表
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
}
topicsToBeDeleted --= nonExistentTopics
// 如果delete.topic.enable参数设置成true
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
reason = "topic reassignment in progress")
}
// 将待删除主题插入到删除等待集合交由TopicDeletionManager处理
topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// 不允许删除主题
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
// 清除ZooKeeper下/admin/delete_topics下的子节点
zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
}
}
总结













网友评论