您的位置:首页 > 其它

kafka源码解析之十二KafkaController(中篇)

2016-04-11 09:29 603 查看

12.3 KafkaController PartitionStateMachine

它实现了topic的分区状态切换功能,Partition存在的状态如下:

状态名

状态存在的时间

有效的前置状态

NonExistentPartition

1.partition重来没有被创建

2.partition创建之后被删除

OfflinePartition


NewPartition

1.partition创建之后,被分配了replicas,但是还没有leader/isr

NonExistentPartition

OnlinePartition

1.partition在replicas中选举某个成为leader之后

NewPartition/OfflinePartition

OfflinePartition

1.partition的replicas中的leader下线之后,没有重新选举新的leader之前

2.partition创建之后直接被下线

NewPartition/OnlinePartition

Partition状态切换的过程如下:

状态切换

切换的时机

NonExistentPartition -> NewPartition

1.从zk上加载assigned replicas置kafkaControl内部的缓存中

NewPartition-> OnlinePartition

1.分配第一个live replica作为leader,其它libe replicas作为isr,并把信息写入到zk

OnlinePartition,OfflinePartition -> OnlinePartition

1.为partition重新选举新的leader和isr,并把信息写入到zk


NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition

1.仅仅是在kafkaControl中标记该状态为OfflinePartition

OfflinePartition -> NonExistentPartition

1.仅仅是在kafkaControl中标记该状态为NonExistentPartition

因此重点关注PartitionStateMachine的handleStateChange函数
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector,
callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
"the partition state machine has not started")
.format(controllerId, controller.epoch, topicAndPartition, targetState))
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
targetState match {
case NewPartition =>
//检查前置状态
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
//更新controllerContext中的partitionReplicaAssignment
assignReplicasToPartitions(topic, partition)
//修改partition的状态
partitionState.put(topicAndPartition, NewPartition)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
assignedReplicas))
case OnlinePartition =>
//检查前置状态
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>// NewPartition-> OnlinePartition
/* 1.根据partitionReplicaAssignment中信息选择第一个live的replica为leader,其余为isr
*2.将leader和isr持久化到zk
*3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
initializeLeaderAndIsrForPartition(topicAndPartition)
case OfflinePartition =>// OfflinePartition-> OnlinePartition
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是OfflinePartitionLeaderSelector
*2.将leader和isr持久化到zk
*3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition =>// OnlinePartition -> OnlinePartition
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是ReassignedPartitionLeaderSelector
*2.将leader和isr持久化到zk
*3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
//更新partition的状态
partitionState.put(topicAndPartition, OnlinePartition)
val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
case OfflinePartition =>
//检查前置状态
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
//更新partition的状态
partitionState.put(topicAndPartition, OfflinePartition)
case NonExistentPartition =>
//检查前置状态
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
//更新partition的状态
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
}
} catch {
case t: Throwable =>
stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
}
}

12.4 KafkaController PartitionLeaderSelector

当partition的状态发生切换时,特别发生如下切换:OfflinePartition-> OnlinePartition和OnlinePartition -> OnlinePartition时需要调用不同的PartitionLeaderSelector来确定leader和isr,当前一共支持5种PartitionLeaderSelector,分别为:NoOpLeaderSelector,OfflinePartitionLeaderSelector,ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector。

12.4.1 NoOpLeaderSelector

/**
* Essentially does nothing. Returns the current leader and ISR, and the current
* set of replicas assigned to a given topic/partition.
*/
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {

this.logIdent = "[NoOpLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}
}
基本上啥也没做,就是把currentLeaderAndIsr和set of replicas assigned to a given topic/partition

12.4.2 OfflinePartitionLeaderSelector

class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>//isr中的broker都离线了,则需要从asr中选择leader
if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
}
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicas.mkString(",")))
liveAssignedReplicas.isEmpty match {
case true =>//如果asr中的broker也都已经离线了,则这个topic/partition挂了
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>//如果asr中的broker有一些是在线的
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head//取第一个为leader
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>//isr中的broker有一些是在线的
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
val newLeader = liveReplicasInIsr.head//选择第一个live的replica
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicas)
case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
}
}
}

12.4.3 ReassignedPartitionLeaderSelector

class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//patition被重新分配的replicas
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
//在reassignedInSyncReplicas中筛选replica其所在的broker是live的和当前的replica是位于isr中的
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {//存在满足以上条件的replica,则筛选为leader
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>//否则reassigned失败
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

12.4.4 PreferredReplicaPartitionLeaderSelector

class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//默认选举第一个replica作为leader
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {//如果已经实现,则退出
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// 检查这个replica是否位于isr和其所在的broker是否live,如果是的话,则其恢复成leader,此场景主要用于负载均衡的情况
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

12.4.5 ControlledShutdownLeaderSelector

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
extends PartitionLeaderSelector
with Logging {
this.logIdent = "[ControlledShutdownLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
//筛选出live状态的replica
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
//筛选出live状态的isr
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
case Some(newLeader) =>//如果存在newLeader,选择其作为leader
debug("Partition %s : current leader = %d, new leader = %d"
.format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
}

12.5 KafkaController ReplicaStateMachine

它实现了topic的partition的replica状态切换功能,replica存在的状态如下:
状态名
状态存在的时间
有效的前置状态
NewReplica
1.replica被分配的时候,此时该replica还没有工作,其角色只能是follower
NonExistentReplica

OnlineReplica
1.replica开始工作,可能作为leader或者follower
NewReplica/OnlineReplica/ OfflineReplica
OfflineReplica
1.该replica挂了,比如说该replica所在的broker离线了
NewReplica, OnlineReplica
ReplicaDeletionStarted
1.开始删除该replica的时候
OfflineReplica
ReplicaDeletionSuccessful
1.replica成功响应删除该副本的请求的时候
,此时kafkaControl内存中还保留此replica的信息
ReplicaDeletionStarted
ReplicaDeletionIneligible
1.如果该replica删除失败
ReplicaDeletionStarted
NonExistentReplica
1.
replica信息被从KafkaControl内存中删除的时候
ReplicaDeletionSuccessful
replica状态切换的过程如下:
状态切换切换的时机
NonExistentReplica->
NewReplica
1.KafkaControl 发送LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica -> OnlineReplica1.当KafkaControl按需把new replica加入到asr中的时候,实际上NewReplica转化为OnlineReplica是一个很快的过程,中间存在的时间很短,其转化出现在onNewPartitionCreation
OnlineReplica,OfflineReplica->
OnlineReplica
1. KafkaControl 发送 LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible
-> OfflineReplica
1. kafkaControl发送StopReplicaRequest to the replica (w/o deletion)
2.kafkaControl 清除 this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every
live broker.
OfflineReplica->ReplicaDeletionStarted1.kafkaControl发送StopReplicaRequest to the replica
ReplicaDeletionStarted->ReplicaDeletionSuccessful1.kafkaControl mark the state of the replica in the state
machine
ReplicaDeletionStarted->ReplicaDeletionIneligible1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionSuccessful-> NonExistentReplica1.kafkaControl remove the replica from
the in memory partition replica assignment cache
因此重点关注ReplicaStateMachine的handleStateChange函数

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
callbacks: Callbacks) {
val topic = partitionAndReplica.topic
val partition = partitionAndReplica.partition
val replicaId = partitionAndReplica.replica
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
"to %s failed because replica state machine has not started")
.format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
try {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
targetState match {
case NewReplica =>//当客户端刚创建topic的时候,触发KafkaControl内部的回调onNewPartitionCreation
//判断前置状态
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) //NewReplica不可能是该Partition的leader,只有online状态才有leader
throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
.format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
//封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
case None => // new leader request will be sent to this replica when one gets elected
}
//置状态为NewReplica
replicaState.put(partitionAndReplica, NewReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
targetState))
case ReplicaDeletionStarted =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
//置状态为ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理,并且在收到reponse的时候回调TopicDeletionManager中的deleteTopicStopReplicaCallback,将那些成功删除的replica状态切换为ReplicaDeletionSuccessful,将那些删除失败的replica状态切换为ReplicaDeletionIneligible
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
callbacks.stopReplicaResponseCallback)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case ReplicaDeletionIneligible =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
//置状态为ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case ReplicaDeletionSuccessful =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
//置状态为ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case NonExistentReplica =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//更新partition的分布请求
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
//删除该replica的状态
replicaState.remove(partitionAndReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case OnlineReplica =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
case NewReplica =>//基本上啥也没做
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if(!currentAssignedReplicas.contains(replicaId))//按需添加replica
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
targetState))
case _ =>//可能之前已经存在,则向其发送leader和isr的request
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
//置状态为OnlineReplica,感觉有点多余
replicaState.put(partitionAndReplica, OnlineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case None =>

}
}
//置状态为OnlineReplica
replicaState.put(partitionAndReplica, OnlineReplica)
case OfflineReplica =>
//判断前置状态
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(currLeaderIsrAndControllerEpoch) =>
//删除该replica
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
//此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
}
//置状态为OfflineReplica
replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
false
case None =>
true
}
case None =>
true
}
if (leaderAndIsrIsEmpty)//不能没有leader
throw new StateChangeFailedException(
"Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
.format(replicaId, topicAndPartition))
}
}
catch {
case t: Throwable =>
stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
.format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: