spark 1.6.0 core源码分析3 Master HA
2016-07-05 21:11
423 查看
在Master启动过程中,首先调用了 netty on Start方法。
上面的persistenceEngine_封装了在zk中读写元数据信息,以及序列化反序列化的接口
leaderElectionAgent_封装了master的选举过程,见下面代码注释中的解释
[java] view plain copy private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
//依赖zk中的一个节点来判断选主
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
//构造这个对象之后就调用了start方法
start()
//leaderLatch.start()一旦调用,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
override def stop() {
leaderLatch.close()
zk.close()
}
//当一个master被选为主时,isLeader方法被回调,说明在这一轮选举中胜出
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
//当一个master被选为备时,notLeader方法被回调,说明在这一轮选举中落败
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
private def updateLeadershipStatus(isLeader: Boolean) {
//当一个master之前状态为备,目前被选为主
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterActor.electedLeader()//调用master类的electedLeader方法
//当一个master之前状态为主,目前被选为备
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterActor.revokedLeadership()//调用master类的revokedLeadership方法
}
}
private object LeadershipStatus extends Enumeration {
type LeadershipStatus = Value
val LEADER, NOT_LEADER = Value
}
}
继续查看master中的逻辑
[java] view plain copy override def receiveWithLogging: PartialFunction[Any, Unit] = {
case ElectedLeader => {
//既然之前是备,现在想变成主,就需要读取zk中的必要的信息来构造元数据
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE//如果没有任何元数据需要构造,则直接置为alive状态
} else {
RecoveryState.RECOVERING//不然需要置为恢复中
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)//见下面介绍
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}
case CompleteRecovery => completeRecovery()
//之前是主,现在被置为备了,不需要额外操作,退出即可
case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
}
开始恢复[java] view plain copy private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)//将读到的app加载到内存
app.state = ApplicationState.UNKNOWN//状态置为unknown
app.driver.send(MasterChanged(self, masterWebUiUrl))//向driver发送MasterChanged消息
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver//将读到的driver加载到内存
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
registerWorker(worker)//将读到的worker信息加载到内存
worker.state = WorkerState.UNKNOWN//同样状态需要置为unknown,需要等到worker发送消息过来之后才能认为该worker是可用的
wworker.endpoint.send(MasterChanged(self, masterWebUiUrl))//向worker发送MasterChanged消息
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
看driver端收到MasterChanged消息会发生什么?在AppClient.scala中
logInfo("Master has changed, new master is at " + masterUrl)
//收到这个消息之后,driver需要修改之前保存的master信息,用于之后向新的master通信
changeMaster(masterUrl)
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)//向master反馈MasterChangeAcknowledged消息
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING //收到消息后将app状态置为WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
if (canCompleteRecovery) { completeRecovery() } //这个只是优先判断消息处理是否都结束了,这样就不用等待worker_timeout的时间间隔再调用completeRecovery了
}
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)//同上
//master不与Executor交互,所以需要worker来告诉master关于Executor的信息
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE //这时可以将之前worker状态unknown修改为ALIVE,代表该worker可用
//将接受到的Executor信息更新到相关的app,worker中
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
//将master中driver信息更新,状态置为RUNNING
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
if (canCompleteRecovery) { completeRecovery() } //同上
}
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY//状态置为恢复完成
}
// Kill off any workers and apps that didn't respond to us.
//清理在这个worker_timeout间隔过后还未处理成功的worker和app
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
//在一番消息通信之后,本应该在driver中更新的worker信息不见了,则重启driver或者删除
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE //这时恢复状态真正结束了
schedule() //整个选主流程结束时候,重新调度一次
logInfo("Recovery complete - resuming operations!")
}
override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort //这里会启一个定时调度,检查timeout的worker进程。如果有worker超时,则将状态置为DEAD,并清理一些内存中关于该worker的信息。如果该worker中有Executor进程,则向driver发送ExecutorUpdated消息,表明该Executor也已经不可用了。如果该worker中有Driver进程,且配置driver是可以relaunch的,则重新调度在可用的worker节点上启动,不然的话就删除该Driver的内存信息。只有在该worker超时很多次之后,才真正删除,之前其实只是让该worker不被选中执行任务而已。 checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.getInt("spark.master.rest.port", 6066) restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) } restServerBoundPort = restServer.map(_.start()) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are // started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) //master ha 过程 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }
上面的persistenceEngine_封装了在zk中读写元数据信息,以及序列化反序列化的接口
leaderElectionAgent_封装了master的选举过程,见下面代码注释中的解释
[java] view plain copy private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
//依赖zk中的一个节点来判断选主
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
//构造这个对象之后就调用了start方法
start()
//leaderLatch.start()一旦调用,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
override def stop() {
leaderLatch.close()
zk.close()
}
//当一个master被选为主时,isLeader方法被回调,说明在这一轮选举中胜出
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
//当一个master被选为备时,notLeader方法被回调,说明在这一轮选举中落败
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
private def updateLeadershipStatus(isLeader: Boolean) {
//当一个master之前状态为备,目前被选为主
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterActor.electedLeader()//调用master类的electedLeader方法
//当一个master之前状态为主,目前被选为备
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterActor.revokedLeadership()//调用master类的revokedLeadership方法
}
}
private object LeadershipStatus extends Enumeration {
type LeadershipStatus = Value
val LEADER, NOT_LEADER = Value
}
}
继续查看master中的逻辑
[java] view plain copy override def receiveWithLogging: PartialFunction[Any, Unit] = {
case ElectedLeader => {
//既然之前是备,现在想变成主,就需要读取zk中的必要的信息来构造元数据
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE//如果没有任何元数据需要构造,则直接置为alive状态
} else {
RecoveryState.RECOVERING//不然需要置为恢复中
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)//见下面介绍
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}
case CompleteRecovery => completeRecovery()
//之前是主,现在被置为备了,不需要额外操作,退出即可
case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
}
开始恢复[java] view plain copy private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)//将读到的app加载到内存
app.state = ApplicationState.UNKNOWN//状态置为unknown
app.driver.send(MasterChanged(self, masterWebUiUrl))//向driver发送MasterChanged消息
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver//将读到的driver加载到内存
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
registerWorker(worker)//将读到的worker信息加载到内存
worker.state = WorkerState.UNKNOWN//同样状态需要置为unknown,需要等到worker发送消息过来之后才能认为该worker是可用的
wworker.endpoint.send(MasterChanged(self, masterWebUiUrl))//向worker发送MasterChanged消息
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
看driver端收到MasterChanged消息会发生什么?在AppClient.scala中
只有主master会发送MasterChanged消息,所以这里的masterUrl肯定是新的主master的[java] view plain copy case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
//收到这个消息之后,driver需要修改之前保存的master信息,用于之后向新的master通信
changeMaster(masterUrl)
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)//向master反馈MasterChangeAcknowledged消息
master这时会收到所有app中driver发来的消息,我们看master收到MasterChangeAcknowledged消息的处理方式,参数为appId[java] view plain copy case MasterChangeAcknowledged(appId) => {
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING //收到消息后将app状态置为WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
if (canCompleteRecovery) { completeRecovery() } //这个只是优先判断消息处理是否都结束了,这样就不用等待worker_timeout的时间间隔再调用completeRecovery了
}
看worker端收到MasterChanged消息会发生什么?在Worker.scala中[java] view plain copy case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)//同上
//master不与Executor交互,所以需要worker来告诉master关于Executor的信息
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
继续看master中的处理逻辑[java] view plain copy case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE //这时可以将之前worker状态unknown修改为ALIVE,代表该worker可用
//将接受到的Executor信息更新到相关的app,worker中
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
//将master中driver信息更新,状态置为RUNNING
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
if (canCompleteRecovery) { completeRecovery() } //同上
}
这一切都处理完毕之后,看master的completeRecovery,这个是在beginRecovery调用之后,在延迟worker_timeout时间之后调用,一般情况下,上面的消息来回发送处理应该都已经结束了[java] view plain copy private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY//状态置为恢复完成
}
// Kill off any workers and apps that didn't respond to us.
//清理在这个worker_timeout间隔过后还未处理成功的worker和app
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
//在一番消息通信之后,本应该在driver中更新的worker信息不见了,则重启driver或者删除
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE //这时恢复状态真正结束了
schedule() //整个选主流程结束时候,重新调度一次
logInfo("Recovery complete - resuming operations!")
}
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Rabbitmq集群搭建笔记
- Spark,一种快速数据分析替代方案
- 浅析Ruby的源代码布局及其编程风格
- asp.net 抓取网页源码三种实现方法
- master数据库损坏的解决办法有哪些
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- java中原码、反码与补码的问题分析
- ASP.NET使用HttpWebRequest读取远程网页源代码
- SQLServer中master数据库分析
- 在数据库‘master’中拒绝CREATE DATABASE权限问题的解决方法
- PHP网页游戏学习之Xnova(ogame)源码解读(六)
- C#获取网页HTML源码实例
- PHP网页游戏学习之Xnova(ogame)源码解读(八)