您的位置:首页 > 其它

3.Master注册机制源码分析和状态改变机制源码分析

2017-06-22 18:50 447 查看
master注册机制原理图如下 , 说明了就是将Application信息 , Driver信息和所有的Worker信息加入缓存队列中



1. Application的注册其实在master.scala的代码中就一段代码 :

/**

* 处理Application注册请求信息

*/

case RegisterApplication(description) => {

// 如果当前master为standByMaster , 不是ActiveMaster , 那么Application来注册则什么都不会做

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else {

logInfo("Registering app " + description.name)

// 通过接收到的application desc信息创建Application对象

val app = createApplication(description, sender)


// 注册Application对象

registerApplication(app)

logInfo("Registered app " + description.name + " with ID " + app.id)


// 持久化Application信息

persistenceEngine.addApplication(app)


// 向master发送注册Application的信息 , 也就是反向向SparkDeploySchedulerBackend的AppClient的ClientActor发型已经注册的RegisteredApplication消息

sender ! RegisteredApplication(app.id, masterUrl)


// 开始资源调度

schedule()

}

}

[/code]

创建Application代码如下 :

def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {

val now = System.currentTimeMillis()

val date = new Date(now)

new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)

}

[/code]

注册Application对象如下 :

/**

* 注册Application信息 , 将相关的Application信息加入缓存队列中

*/

def registerApplication(app: ApplicationInfo): Unit = {

val appAddress = app.driver.path.address

if (addressToApp.contains(appAddress)) {

logInfo("Attempted to re-register application at same address: " + appAddress)

return

}


// 这里将相关的Application信息加入缓存队列中 

applicationMetricsSystem.registerSource(app.appSource)

apps += app

idToApp(app.id) = app

actorToApp(app.driver) = app

addressToApp(appAddress) = app


// 这里将Application加入等待调度的队列中 , waitingApps其实就是一个ArrayBuffer

waitingApps += app

}

[/code]

这其中关于ApplicationDescription的代码如下 :

private[spark] class ApplicationDescription(

val name: String, // Application的名称

val maxCores: Option[Int], //Application的最大使用cou core

val memoryPerSlave: Int, //Application需要的每个节点使用的内存

val command: Command, //命令

var appUiUrl: String, //Application所在节点的ui URL

val eventLogDir: Option[String] = None,

// short name of compression codec used when writing event logs, if any (e.g. lzf)

val eventLogCodec: Option[String] = None)

[/code]

2.Worker的注册原理和Application相似 , 代码如下 :

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>

{

  logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))


  // 检查Master的状态

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {


// 创建WorkerInfo信息对象 , 封装相关的worker信息

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

  sender, workerUiPort, publicAddress)


// 注册worker , 调用registerWorker方法进行注册 , 若注册成功进行worker信息持久化并向master发送注册的消息

if (registerWorker(worker)) {

  // worker信息持久化

  persistenceEngine.addWorker(worker)

  // 向master发送注册消息

  sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

  // 开始资源调度

  schedule()

} else {

  // 注册失败的话向master发送注册失败的消息

  val workerAddress = worker.actor.path.address

  logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

  sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

[/code]

注册worker的相关信息源码如下 :

/**

* 注册worker信息

*/

def registerWorker(worker: WorkerInfo): Boolean = {

// There may be one or more refs to dead workers on this same node (w/ different ID's),

// remove them.

// 这里过滤掉已经死掉的worker , 将他们从缓存队列中移除

workers.filter { w =>

  (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)

}.foreach { w =>

  workers -= w

}


// 获取worker的url地址

val workerAddress = worker.actor.path.address

// 检查worker的地址缓存队列中是否已经有了该worker的地址信息

if (addressToWorker.contains(workerAddress)) {

  // 从worker的地址缓存队列(HashMap)中获取已经存在的worker的地址信息 , 称之为oldworker

  val oldWorker = addressToWorker(workerAddress)

  // 若是oldworker为UNKNOW状态的话需要将其从缓存队列中移除

  if (oldWorker.state == WorkerState.UNKNOWN) {

// A worker registering from UNKNOWN implies that the worker was restarted during recovery.

// The old worker must thus be dead, so we will remove it and accept the new worker.

removeWorker(oldWorker)

} else {

logInfo("Attempted to re-register worker at same address: " + workerAddress)

return false

}

}


// 将新增加的worker加入缓存队列HashSet中

workers += worker

// worker的id信息加入id缓存队列

idToWorker(worker.id) = worker

// 将worker的地址加入address缓存队列中

addressToWorker(workerAddress) = worker

true

}

[/code]

3.Dirver的注册源码也相似 , 代码如下 :

/**

 * 注册Driver

*/

case RequestSubmitDriver(description) => {

  // 检查master状态

  if (state != RecoveryState.ALIVE) {

val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."

sender ! SubmitDriverResponse(false, None, msg)

} else {

logInfo("Driver submitted " + description.command.mainClass)


// 根据DriverDescription创建Driver

val driver = createDriver(description)

// 持久化Driver信息

persistenceEngine.addDriver(driver)

// 将Driver加入等待调度的缓存队列中

waitingDrivers += driver

// 将Driver加入缓存队列

drivers.add(driver)

// 开始调度

schedule()


// TODO: It might be good to instead have the submission client poll the master to determine

//   the current status of the driver. For now it's simply "fire and forget".


sender ! SubmitDriverResponse(true, Some(driver.id),

  s"Driver successfully submitted as ${driver.id}")

}

}

[/code]

关于DriverDescription.scala的部分代码如下 :

private[spark] class DriverDescription(

val jarUrl: String, // jar包的名称

val mem: Int, // Dirver所需要的内存

val cores: Int, // Driver所需要的cpu core数量

val supervise: Boolean, // Driver是否被master监控

val command: Command) // 相关命令

extends Serializable {


def copy(

  jarUrl: String = jarUrl,

  mem: Int = mem,

  cores: Int = cores,

  supervise: Boolean = supervise,

  command: Command = command): DriverDescription =

new DriverDescription(jarUrl, mem, cores, supervise, command)


override def toString: String = s"DriverDescription (${command.mainClass})"

}

[/code]

同时呢在看一下Driver状态改变的代码 :

/**

 * Driver的状态改变时需要做的曹组

*/

case DriverStateChanged(driverId, state, exception) => {

  state match {

// 如果Driver的状态为ERROR,FINISHED,KILLED,FAILED 那么都会将Driver杀掉

case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>

  removeDriver(driverId, state, exception)

case _ =>

  throw new Exception(s"Received unexpected state update for driver $driverId: $state")

}

}

[/code]

重点查看Driver移除的代码 :

/**

* 移除Driver

*/

def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {

// 通过scala的find调度找到缓存队列中的driver并进行匹配

drivers.find(d => d.id == driverId) match {

  case Some(driver) =>

logInfo(s"Removing driver: $driverId")

// 从缓存队列Driver中移除

drivers -= driver

if (completedDrivers.size >= RETAINED_DRIVERS) {

  val toRemove = math.max(RETAINED_DRIVERS / 10, 1)

  completedDrivers.trimStart(toRemove)

}

// 加入已经完成的缓存driver队列中

completedDrivers += driver

// 持久化缓存信息

persistenceEngine.removeDriver(driver)

// 更改缓存状态

driver.state = finalState

driver.exception = exception

driver.worker.foreach(w => w.removeDriver(driver))

schedule()

  case None =>

logWarning(s"Asked to remove unknown driver: $driverId")

}

}

[/code]

关于Executor的状态改变如下 :

/**

 * Executor状态改变所需要的操作

*/

case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {

  // 找到executor对应app,然后反过来通过app对应的executor缓存获取executor信息

  val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

  execOption match {

// 成功找到

case Some(exec) => {

  // 设置executor的当前状态

  val appInfo = idToApp(appId)

  exec.state = state

  if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }

  // 向driver同步发送executorUpdate的信息

  exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)

   

  // 判断 , 如果executor已经完成了

  if (ExecutorState.isFinished(state)) {

// Remove this executor from the worker and app

logInfo(s"Removing executor ${exec.fullId} because it is $state")


// 从Application中移除掉executor

appInfo.removeExecutor(exec)

// 从worker中移除exec

exec.worker.removeExecutor(exec)


// 如果executor的状态退出异常

val normalExit = exitStatus == Some(0)

// Only retry certain number of times so we don't go into an infinite loop.

if (!normalExit) {

  

  // 判断Application当前的重试次数是否达到了最大值 , 最大值默认为10

  if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {

// 没有达到最大值 则继续调度

schedule()

} else {


// 没有达到最大值那就认为executor调度失败 , 并同时认为Application也是失败了 , 将Application也从缓存队列移除掉

val execs = appInfo.executors.values

if (!execs.exists(_.state == ExecutorState.RUNNING)) {

  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +

s"${appInfo.retryCount} times; removing it")

  

  // 移除掉executor所在的Application

  removeApplication(appInfo, ApplicationState.FAILED)

}

}

}

}

}

case None =>

  logWarning(s"Got status update for unknown executor $appId/$execId")

}

}

[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: