Master原理剖析与源码分析:Master状态改变处理机制原理剖析与源码分析
2017-10-13 20:28
666 查看
1、主备切换机制原理剖析与源码分析
2、注册机制原理剖析与源码分析
2、注册机制原理剖析与源码分析
3、状态改变处理机制源码分析
4、资源调度机制源码分析(schedule(),两种资源调度算法)// Master中Driver状态改变的的信息 private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { drivers.find(d => d.id == driverId) match { case Some(driver) => logInfo(s"Removing driver: $driverId") //将Driver从缓存中移除 drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } //将Driver标记为已完成 completedDrivers += driver //使用持久化引擎去除Driver得持久化信息 persistenceEngine.removeDriver(driver) driver.state = finalState driver.exception = exception //将Driver所在的worker去除Driver。 driver.worker.foreach(w => w.removeDriver(driver)) schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
Master中Executor状态改变 case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { val appInfo = idToApp(appId) val oldState = exec.state exec.state = state if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } //向Driver发送ExecutorUpdated信息 exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) //如果ExecutorState.isFinished if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // If an application has already finished, preserve its // state to display its information properly on the UI if (!appInfo.isFinished) { //从app的缓存中移除executor appInfo.removeExecutor(exec) } exec.worker.removeExecutor(exec) //判断,如果Executor退出试是非正常的 val normalExit = exitStatus == Some(0) // 判断application的重试次数是否达到了最大值 if (!normalExit) { if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { //重新调度 schedule() } else { val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") //Executor反复调度都是失败的话,那么Application任务也会失败,就要进行removeApplication 的操作。 removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") }
相关文章推荐
- Spark源码分析之Master状态改变处理机制原理
- spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制
- Master原理剖析与源码分析:资源调度机制源码分析(schedule(),两种资源调度算法)
- Spark2.2 Driver和Executor状态改变处理机制源码分析
- Master原理剖析与源码分析:主备切换机制原理剖析与源码分析
- Master原理剖析与源码分析:注册机制原理剖析与源码分析
- Spark内核源码深度剖析:Master主备切换机制原理剖析与源码分析
- Spark的Master分析3(Master状态改变机制分析)
- 3.Master注册机制源码分析和状态改变机制源码分析
- spark源码分析之master状态改变篇
- Spark源码分析之Master注册机制原理
- Master原理剖析和源码分析
- Spark源码之路(二):Master原理剖析与源码分析
- 0003.spark2.0源码分析(3)--master注册机制与状态管理
- Master主备切换机制原理及源码分析(笔记)
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- Android 消息处理机制1(从源码分析)
- 通过源码分析Android 的消息处理机制