深入学习Kafka:PartitionLeaderSelector源码分析
2017-12-11 15:56
375 查看
所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!
PartitionLeaderSelector主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。
TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了选举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。
PartitionLeaderSelector的实现类主要有
1. NoOpLeaderSelector
2. OfflinePartitionLeaderSelector
3. ReassignedPartitionLeaderSelector
4. PreferredReplicaPartitionLeaderSelector
5. ControlledShutdownLeaderSelector
先找出已分配的副本集合(assignedReplicas),然后过滤出仍存活的副本集合(liveAssignedReplicas),在该列表中选取第一个broker作为该分区的主副本
1. 如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
2. 如果ISR列表为空,且unclean.leader.election.enable=false,则报错NoReplicaOnlineException
3. 如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中选出Leader,但是这样会引起数据不一致
4. 若AR列表也为空,则报错NoReplicaOnlineException
[参考资料]
https://yq.aliyun.com/articles/15283
http://blog.csdn.net/zhanglh046/article/details/72822066
PartitionLeaderSelector主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。
TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了选举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。
trait PartitionLeaderSelector { def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) }
PartitionLeaderSelector的实现类主要有
1. NoOpLeaderSelector
2. OfflinePartitionLeaderSelector
3. ReassignedPartitionLeaderSelector
4. PreferredReplicaPartitionLeaderSelector
5. ControlledShutdownLeaderSelector
NoOpLeaderSelector
NoOpLeaderSelector就是啥也不做的LeaderSelector。class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[NoOpLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) } }
ControlledShutdownLeaderSelector
当controller收到shutdown命令后,触发新的分区主副本选举先找出已分配的副本集合(assignedReplicas),然后过滤出仍存活的副本集合(liveAssignedReplicas),在该列表中选取第一个broker作为该分区的主副本
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ControlledShutdownLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val currentLeader = currentLeaderAndIsr.leader //已分配的Replica val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds //仍存活的Replica val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) //当前的ISR列表中滤掉挂掉的broker val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) //存活的ISR列表中,选出第一个broker作为该分区的Leader(主副本) liveAssignedReplicas.filter(newIsr.contains).headOption match { case Some(newLeader) => //如果存在,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1 debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader)) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => //不存在则报错StateChangeFailedException throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) } } }
PreferredReplicaPartitionLeaderSelector
当controller收到分区主副本重新优化分配命令后,触发新的分区主副本优化,即将AR里的第一个取出,作为优化后的主副本class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { //已分配的Replica val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) //已分配的Replica列表中第一个即为最优的副本 val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader //检查是否当前分区主副本已经是最优的副本,则报错LeaderElectionNotNeededException val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) { throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition)) } else { info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Trigerring preferred replica leader election") // check if preferred replica is not the current leader and is alive and in the isr if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { //如果当前的最优主副本存活,返回将其设为最优主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1 (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas) } else { //如果当前的最优主副本挂掉了,则报错StateChangeFailedException throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } }
ReassignedPartitionLeaderSelector
在某个topic重新分配分区的时候,触发新的主副本选举,将存活的ISR中的第一个副本选举成为主副本(leader)class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " /** * The reassigned replicas are already in the ISR when selectLeader is called. */ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { //重新分配的ISR副本集 val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion //过滤出仍存活的重新分配的ISR副本集 val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)) //选取ISR中的第一个为主副本 val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match { //返回ISR中的第一个为主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1 case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) case None => //如果没有存活的ISR,则报错NoReplicaOnlineException,选举失败 reassignedInSyncReplicas.size match { case 0 => throw new NoReplicaOnlineException("List of reassigned replicas for partition " + " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new NoReplicaOnlineException("None of the reassigned replicas for partition " + "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } }
OfflinePartitionLeaderSelector
选出新的Leader,新的ISR,步骤如下:1. 如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
2. 如果ISR列表为空,且unclean.leader.election.enable=false,则报错NoReplicaOnlineException
3. 如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中选出Leader,但是这样会引起数据不一致
4. 若AR列表也为空,则报错NoReplicaOnlineException
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) } debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) liveAssignedReplicas.isEmpty match { case true => //若AR列表也为空,则报错NoReplicaOnlineException throw new NoReplicaOnlineException(("No replica for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => //如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中,选出Leader ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => //如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) val newLeader = liveReplicasInIsr.head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) } } }
[参考资料]
https://yq.aliyun.com/articles/15283
http://blog.csdn.net/zhanglh046/article/details/72822066
相关文章推荐
- Kafka学习(三):Kafka的内部机制深入(持久化,分布式,通讯协议)
- kafka学习之-深入研究原理
- 深入学习Kafka:Leader Election - Kafka集群Leader选举过程分析
- 深入学习Kafka:Topic的删除过程分析
- 深入学习Kafka:集群中Controller和Broker之间通讯机制分析 - ControllerChannelManager
- 【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验
- Kafka学习(三):Kafka的内部机制深入(持久化,分布式,通讯协议)
- java 构造函数深入学习
- JavaEE深入学习系列(1)
- jquery 深入学习笔记之中的一个 (事件绑定)
- 原创 深入解析MySQL的学习过程
- 必看:深入学习Java8中的函数式接口
- JavaScript 对象深入学习总结
- 持续深入学习中--
- java深入学习
- 抽丝剥茧,在实践中深入学习QTP
- C语言中Static深入学习
- Struts2深入学习----OGNL表达式原理
- Android开发者应该深入学习的10个开源应用项目
- [转]深入学习Objective-C(一):揭开NSObject的面纱