一、上下文
《Kafka-Controller选举》博客中分析了Controller是如何选举出来的,且比如会执行onControllerFailover()。接下来让我们看看Controller角色都承担了哪些职责。
二、注册监听器
在从zookeeper读取资源前,注册监听器以获取 broker/topic 的回调。
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,isrChangeNotificationHandler)//依次注册这些 Handler//子节点变化监听childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)//节点变化监听nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
三、初始化ControllerContext
1、获取所有的broker
其实就是获取brokers/ids/ 目录下的id来得到broker列表
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {//从 brokers/ids 目录下获取所有 brokerid 且排好序val brokerIds = getSortedBrokerList//为每个 brokerid 都封装一个 请求val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))val getDataResponses = retryRequestsUntilConnected(getDataRequests)getDataResponses.flatMap { getDataResponse =>val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]getDataResponse.resultCode match {case Code.OK =>// decode 解读 将json 合 brokerid 构建成 BrokerInfo//{// "version":5,// "host":"localhost",// "port":9092,// "jmx_port":9999,// "timestamp":"2233345666",// "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],// *"rack":"dc1",// "features": {"feature": {"min_version":1, "first_active_version":2, "max_version":3}}// }Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))case Code.NONODE => Nonecase _ => throw getDataResponse.resultException.get}}.toMap}
这一步会得到一个Map[Broker, Long]
Broker中有brokerid,也有这台broker的连接信息、机架信息,此时Controller已经知道自己需要管理的broker有哪些,且可以建立通信
2、判断这些broker是否兼容
val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)
返回的结果中:
compatibleBrokerAndEpochs 为兼容的borker map
incompatibleBrokerAndEpochs 为不兼容的borker map
那么怎么判断一个broker是否兼容呢?我们看看下面的代码:
private def partitionOnFeatureCompatibility(brokersAndEpochs: Map[Broker, Long]): (Map[Broker, Long], Map[Broker, Long]) = {//partition 方法://一对元素,首先,所有满足谓词p的元素,其次,所有不满足谓词p的元素。//这两个可迭代集合分别对应filter和filterNot的结果。//这里提供的默认实现需要遍历该集合两次。严格集合在StrictOptimizedIterableOps中有一个重写版本的分区,只需要一次遍历。brokersAndEpochs.partition {case (broker, _) =>!config.isFeatureVersioningSupported ||!featureCache.getFeatureOption.exists(latestFinalizedFeatures =>BrokerFeatures.hasIncompatibleFeatures(broker.features,latestFinalizedFeatures.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap))}}
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
public enum MetadataVersion {IBP_0_8_0(-1, "0.8.0", ""),//.....IBP_2_7_IV0(-1, "2.7", "IV0"),public boolean isFeatureVersioningSupported() {return this.isAtLeast(IBP_2_7_IV0);}
}
MetadataVersion包含不同的Kafka版本,其中2.7就表示该Kafka集群可以兼容的最小版本。如果某个broker的代码版本低于这个版本,就会判定为不兼容。
3、将兼容的broker设置成live状态
controllerContext.setLiveBrokers(compatibleBrokerAndEpochs)
class ControllerContext extends ControllerChannelContext {private val liveBrokers = mutable.Set.empty[Broker]private val liveBrokerEpochs = mutable.Map.empty[Int, Long]def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {clearLiveBrokers()addLiveBrokers(brokerAndEpochs)}def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {liveBrokers ++= brokerAndEpochs.keySetliveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }}}
其实就是在其内部维护了一个map(liveBrokers ),将存活的broker都放入其中。
4、获取所有的topic
controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = {//查看 brokers/topics 下有哪些信息val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path, registerWatch))getChildrenResponse.resultCode match {case Code.OK => getChildrenResponse.children.toSetcase Code.NONODE => Set.emptycase _ => throw getChildrenResponse.resultException.get}}
从zookeeper的brokers/topics目录下获取所有的topic
val allTopics = mutable.Set.empty[String]def setAllTopics(topics: Set[String]): Unit = {allTopics.clear()allTopics ++= topics}
维护了一个set,将所有的topic都放进去
5、检测每个topic下partition的变化
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {topics.foreach { topic =>//依次将每个 topic 注册ControllerEventManager中的 队列中 ,监控topic的改变val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)partitionModificationsHandlers.put(topic, partitionModificationsHandler)}partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)}
topic下有topic_id、partitions、adding_replicas、removing_replicas等信息,当这些发生改变时,kafka也要对正在使用它们的producer、consumer进行调整,详细可以查看processPartitionModifications()
6、获取TopicPartition副本分配信息
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
def getReplicaAssignmentAndTopicIdForTopics(topics: Set[String]): Set[TopicIdReplicaAssignment] = {val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)getDataResponses.map { getDataResponse =>val topic = getDataResponse.ctx.get.asInstanceOf[String]getDataResponse.resultCode match {case Code.OK => TopicZNode.decode(topic, getDataResponse.data)case Code.NONODE => TopicIdReplicaAssignment(topic, None, Map.empty[TopicPartition, ReplicaAssignment])case _ => throw getDataResponse.resultException.get}}.toSet}
brokers/topic/partitions下有每个分区对应的副本信息,此时可以到达 TopicPartition -> 副本信息的对应关系,并将最新的对应关系更新到ControllerContext中
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>assignments.foreach { case (topicPartition, replicaAssignment) =>//更新分区中的所有副本分配controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)if (replicaAssignment.isBeingReassigned)controllerContext.partitionsBeingReassigned.add(topicPartition)}}
7、检测broker改变
registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {debug(s"Register BrokerModifications handler for $brokerIds")//循环每个broker,如果broker发生改变,会触发processBrokerModification(brokerId) 进行处理brokerIds.foreach { brokerId =>val brokerModificationsHandler = new BrokerModificationsHandler(eventManager, brokerId)zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)}}
《Kafka-Controller选举》博客中我们已经知道它内部有一个队列,并且有一个循环线程,不停的处理队列中的事件,上面分析的broker改变、partition改变、topic改变都会触发事件,而这些事件都会放入这个对立,进行对应的处理
8、更新所有现有分区的leader和isr缓存
updateLeaderAndIsrCache()
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {//TopicPartitionStateZNode.decode json中有 leader 、leader_epoch、isr、leader_recovery_state、controller_epoch//返回一个 map Map[TopicPartition, LeaderIsrAndControllerEpoch]// LeaderIsrAndControllerEpoch 中有 leader、ISR、LeaderEpoch 、ControllerEpoch 、ZkVersion、LeaderRecoveryStateval leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)}}
class ControllerContext extends ControllerChannelContext {private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]def putPartitionLeadershipInfo(partition: TopicPartition,leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {//设置 分区 leader 信息val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)val replicaAssignment = partitionFullReplicaAssignment(partition)updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))}
}
在ControllerContext中维护了一个map(TopicPartition -> LeaderIsrAndControllerEpoch)来存放每个分区的leader和isr
9、与每个broker建立通信
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
class ControllerChannelManager(...){protected val brokerStateInfo = new mutable.HashMap[Int, ControllerBrokerStateInfo]def startup(initialBrokers: Set[Broker]):Unit = {//controller 会与每个broker建立连接 ,这一步只会将 brokerStateInfo 进行填充initialBrokers.foreach(addNewBroker)//为每个broker启动一个线程进行连接连接brokerLock synchronized {brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))}}private def addNewBroker(broker: Broker): Unit = {//......val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient,brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)requestThread.setDaemon(false)brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))}private def startRequestSendThread(brokerId: Int): Unit = {val requestThread = brokerStateInfo(brokerId).requestSendThreadif (requestThread.getState == Thread.State.NEW)requestThread.start()}
}
下面我们看下 RequestSendThread中都做了什么?
RequestSendThread继承了ShutdownableThread,它里面会循环调起doWork();
override def doWork(): Unit = {def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)var clientResponse: ClientResponse = nulltry {var isSendSuccessful = falsewhile (isRunning && !isSendSuccessful) {// 如果代理长时间关闭,那么在某个时候,控制器的zookeeper监听器将触发removeBroker,该监听器将在该线程上调用shutdown()。到那时,我们将停止重试。try {if (!brokerReady()) {isSendSuccessful = falsebackoff()}else {val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,time.milliseconds(), true)clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)isSendSuccessful = true}} catch {case e: Throwable => //如果发送不成功,请重新连接到代理并重新发送消息networkClient.close(brokerNode.idString)isSendSuccessful = falsebackoff()}}if (clientResponse != null) {val requestHeader = clientResponse.requestHeader//controller 与broker 之间使用的api有这三个// ApiKeys.LEADER_AND_ISR 、ApiKeys.LEADER_AND_ISR 、ApiKeys.UPDATE_METADATA)val api = requestHeader.apiKeyif (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.UPDATE_METADATA)throw new KafkaException(s"Unexpected apiKey received: $apiKey")val response = clientResponse.responseBodystateChangeLogger.withControllerEpoch(controllerEpoch()).trace(s"Received response " +s"$response for request $api with correlation id " +s"${requestHeader.correlationId} sent to broker $brokerNode")if (callback != null) {callback(response)}}} catch {case e: Throwable =>//如果出现任何socket错误(例如socket超时),则连接不再可用,需要重新创建。networkClient.close(brokerNode.idString)}}
四、过滤并删除topic
//topicsToBeDeleted : 要删除的topics列表//topicsIneligibleForDeletion : 不符合删除条件的 topics 列表val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()info("Initializing topic deletion manager")//正在初始化 topic删除 状态机。topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {///admin/delete_topics 下 被标记为删除的 topic// 说明 topic被删除,就是在 zk 的目录下 进行标记val topicsToBeDeleted = zkClient.getTopicDeletions.toSetval topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {//从controllerContext 获取topic的副本 也就是 TopicPartitionval replicasForTopic = controllerContext.replicasForTopic(topic)//判断是否存在 broker 在线 && 副本的状态是不在线replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))}}//topic 重新分配中val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.map(_.topic)// 什么叫不符合删除条件?// 副本的状态是不在线 或者 在重新分配 就将该topic标记为 不符合删除条件的topicval topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress//要删除的topics列表info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")//不符合删除条件的topics列表info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")(topicsToBeDeleted, topicsIneligibleForDeletion)}
五、更新元数据
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = {try {brokerRequestBatch.newBatch()brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)//通过api向每个borker发送一下请求//ApiKeys.LEADER_AND_ISR//ApiKeys.UPDATE_METADATA//ApiKeys.STOP_REPLICAbrokerRequestBatch.sendRequestsToBrokers(epoch)} catch {case e: IllegalStateException =>handleIllegalState(e)}}
ApiKeys.LEADER_AND_ISR:我们在《Kafka-确定broker中的分区是leader还是follower》中分析过,它会让每个borker启动对所属分区的leader角色或者follower角色,并开始各自角色所负责的任务。
ApiKeys.UPDATE_METADATA:会像每个broker同步最新的元数据
六、启动副本状态机、分区状态机
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))replicaStateMachine.startup()partitionStateMachine.startup()
1、副本状态机
其中定义了副本可以处于的状态和转换状态的前置状态。replica可能处于的不同状态如下:
1、NewReplica
controller可以在partition重新分配期间创建新的replicas。在这种状态下,副本只能成为follower状态更改请求。有效的前置状态为NonExistentReplica
2、OnlineReplica
一旦启动了replica并为其partition分配了部分replica,它就处于这种状态。在这种状态下,它可以成为leader或follower状态更改请求。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible
3、OfflineReplica
如果replica死亡,它将移动到此状态。当承载replica的broker关闭时,就会发生这种情况。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible
4、ReplicaDeletionStarted
如果开始删除replica,它将移动到此状态。有效的前置状态为OfflineReplica
5、ReplicaDeletionSuccessful
如果replica在响应删除replica请求时没有错误代码,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted
6、ReplicaDeletionSuccessful
如果replica删除失败,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted和OfflineReplica
7、NonExistentReplica
如果replica被成功删除,它将移动到此状态。有效的前置状态为ReplicaDelegationSuccessful
2、分区状态机
其中定义了分区可以处于的状态和转换状态的前置状态。分区可能处于的不同状态如下:
1、NonExistentPartition
此状态表示分区从未创建或创建后删除。有效的前置状态(如果存在)是OfflinePartition
2、NewPartition
创建后,分区处于NewPartition状态。在这种状态下,分区应该分配了副本,但还没有leader/isr。有效的前置状态为NonExistentPartition
3、OnlinePartition
一旦为分区选出了领导者,它就处于OnlinePartition状态。有效的前置状态为NewPartition/OfflinePartition
4、OfflinePartition
如果在成功选举领导者后,分区的领导者死亡,则分区将移动到OfflinePartition状态。有效的先前状态为NewPartition/OnlinePartition
七、分区重分配
initializePartitionReassignments()
通过检测/admin/revariation_partions的变化来对未来的分区进行重分配
private def initializePartitionReassignments(): Unit = {//当controller 进行故障转移时,新的重新分配可能已通过Zookeeper提交val zkPartitionsResumed = processZkPartitionReassignment()// 我们可能还有一些基于API的重新分配需要重新启动maybeResumeReassignments { (tp, _) =>!zkPartitionsResumed.contains(tp)}}
八、从副本中选举leader
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
//尝试为每个给定分区选择一个副本作为领导者。
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {// 从 zk的 admin/preferred_replica_election 获取首选副本// PreferredReplicaElectionZNode.decode// admin/preferred_replica_election/partitions/topic// admin/preferred_replica_election/partitions/partition// 封装成TopicPartition(topic, partition)val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection// 检查是否已完成或主题是否已删除val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>val replicas = controllerContext.partitionReplicaAssignment(partition)val topicDeleted = replicas.isEmptyval successful =if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == replicas.head else falsesuccessful || topicDeleted}val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElectionval pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion//正在进行首选副本选择的分区info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")//已完成首选副本选择的分区info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")//由于主题删除,跳过分区的首选副本选择info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")//恢复分区的首选副本选择info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")pendingPreferredReplicaElections}
九、开始调度
private[controller] val kafkaScheduler = new KafkaScheduler(1)kafkaScheduler.startup()
public void startup() {log.debug("Initializing task scheduler.");synchronized (this) {if (isStarted())throw new IllegalStateException("This scheduler has already been started.");//初始化线程池//kafkaController 中的 background.threads 默认 10 既 threads = 10 ,但这里是1// 用于各种后台处理任务的线程数ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);executor.setRemoveOnCancelPolicy(true);executor.setThreadFactory(runnable ->new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon));this.executor = executor;}}
十、启动leader自动再平衡任务
auto.leader.rebalance.enable 默认 true ,可以设置成false将其关闭
启用自动 leader 平衡。后台线程定期检查分区 leader 的分布,可由5s配置。如果领导者不平衡超过5s,则触发领导者重新平衡到分区的首选领导者
if (config.autoLeaderRebalanceEnable) {//分区lieader 再平衡任务scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)}
十一、检测Controller变动
《Kafka-Controller选举》中提到,KafkaController启动时注册了两个事件:RegisterBrokerAndReelect 和 Startup
RegisterBrokerAndReelect事件是从众多的broker启动时选举出Controller,而Startup事件是检测zookeeper的controller目录,并再次进行controller的选举。因为Controller是集群的核心,必须在有故障时里面选举出来。
private def processStartup(): Unit = {zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)elect()}