【Spark2.0源码学习】-10.Task执行与回馈
2017-06-09 10:40
447 查看
通过上一节内容,DriverEndpoint最终生成多个可执行的TaskDescription对象,并向各个ExecutorEndpoint发送LaunchTask指令,本节内容将关注ExecutorEndpoint如何处理LaunchTask指令,处理完成后如何回馈给DriverEndpoint,以及整个job最终如何多次调度直至结束。
一、Task的执行流程
承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的compute方法,归并函数得到最终任务执行结果
ExecutorEndpoint接受到LaunchTask指令后,解码出TaskDescription,调用Executor的launchTask方法
Executor创建一个TaskRunner线程,并启动线程,同时将改线程添加到Executor的成员对象中,代码如下:
TaskRunner
首先向DriverEndpoint发送任务最新状态为RUNNING
从TaskDescription解析出Task,并调用Task的run方法
Task
创建TaskContext以及CallerContext(与HDFS交互的上下文对象)
执行Task的runTask方法
如果Task实例为ShuffleMapTask:解析出RDD以及ShuffleDependency信息,调用RDD的compute()方法将结果写Writer中(Writer这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回MapStatus对象
如果Task实例为ResultTask:解析出RDD以及合并函数信息,调用函数将调用后的结果返回
TaskRunner将Task执行的结果序列化,再次向DriverEndpoint发送任务最新状态为FINISHED
二、Task的回馈流程
TaskRunner执行结束后,都将执行状态发送至DriverEndpoint,DriverEndpoint最终反馈指令CompletionEvent至DAGSchedulerEventProcessLoop中
DriverEndpoint接受到StatusUpdate消息后,调用TaskScheduler的statusUpdate(taskId, state, result)方法
TaskScheduler如果任务结果是完成,那么清除该任务处理中的状态,并调动TaskResultGetter相关方法,关键代码如下:
TaskResultGetter启动线程启动线程【task-result-getter】进行相关处理
通过解析或者远程获取得到Task的TaskResult对象
调用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接调用TaskSetManager的handleSuccessfulTask方法
TaskSetManager
更新内部TaskInfo对象状态,并将该Task从运行中Task的集合删除,代码如下:
调用DAGScheduler的taskEnded方法,关键代码如下:
DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent对象定义如下
三、Task的迭代流程
DAGSchedulerEventProcessLoop中针对于CompletionEvent指令,调用DAGScheduler进行处理,DAGScheduler更新Stage与该Task的关系状态,如果Stage下Task都返回,则做下一层Stage的任务拆解与运算工作,直至Job被执行完毕
DAGSchedulerEventProcessLoop接收到CompletionEvent指令后,调用DAGScheduler的handleTaskCompletion方法
DAGScheduler根据Task的类型分别处理
如果Task为ShuffleMapTask
待回馈的Partitions减取当前partitionId
如果所有task都返回,则markStageAsFinished(shuffleStage),同时向MapOutputTrackerMaster注册MapOutputs信息,且markMapStageJobAsFinished
调用submitWaitingChildStages(shuffleStage)进行下层Stages的处理,从而迭代处理最终处理到ResultTask,job结束,关键代码如下:
如果Task为ResultTask
改job的partitions都已返回,则markStageAsFinished(resultStage),并cleanupStateForJobAndIndependentStages(job),关键代码如下
至此,用户编写的代码最终调用Spark分布式计算完毕。
一、Task的执行流程
承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的compute方法,归并函数得到最终任务执行结果
ExecutorEndpoint接受到LaunchTask指令后,解码出TaskDescription,调用Executor的launchTask方法
Executor创建一个TaskRunner线程,并启动线程,同时将改线程添加到Executor的成员对象中,代码如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] runningTasks.put(taskDescription.taskId, taskRunner)
TaskRunner
首先向DriverEndpoint发送任务最新状态为RUNNING
从TaskDescription解析出Task,并调用Task的run方法
Task
创建TaskContext以及CallerContext(与HDFS交互的上下文对象)
执行Task的runTask方法
如果Task实例为ShuffleMapTask:解析出RDD以及ShuffleDependency信息,调用RDD的compute()方法将结果写Writer中(Writer这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回MapStatus对象
如果Task实例为ResultTask:解析出RDD以及合并函数信息,调用函数将调用后的结果返回
TaskRunner将Task执行的结果序列化,再次向DriverEndpoint发送任务最新状态为FINISHED
二、Task的回馈流程
TaskRunner执行结束后,都将执行状态发送至DriverEndpoint,DriverEndpoint最终反馈指令CompletionEvent至DAGSchedulerEventProcessLoop中
DriverEndpoint接受到StatusUpdate消息后,调用TaskScheduler的statusUpdate(taskId, state, result)方法
TaskScheduler如果任务结果是完成,那么清除该任务处理中的状态,并调动TaskResultGetter相关方法,关键代码如下:
val taskSet = taskIdToTaskSetManager.get(tid) taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { executorId => executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } } taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) }
TaskResultGetter启动线程启动线程【task-result-getter】进行相关处理
通过解析或者远程获取得到Task的TaskResult对象
调用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接调用TaskSetManager的handleSuccessfulTask方法
TaskSetManager
更新内部TaskInfo对象状态,并将该Task从运行中Task的集合删除,代码如下:
val info = taskInfos(tid) info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) removeRunningTask(tid)
调用DAGScheduler的taskEnded方法,关键代码如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent对象定义如下
private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) extends DAGSchedulerEvent
三、Task的迭代流程
DAGSchedulerEventProcessLoop中针对于CompletionEvent指令,调用DAGScheduler进行处理,DAGScheduler更新Stage与该Task的关系状态,如果Stage下Task都返回,则做下一层Stage的任务拆解与运算工作,直至Job被执行完毕
DAGSchedulerEventProcessLoop接收到CompletionEvent指令后,调用DAGScheduler的handleTaskCompletion方法
DAGScheduler根据Task的类型分别处理
如果Task为ShuffleMapTask
待回馈的Partitions减取当前partitionId
如果所有task都返回,则markStageAsFinished(shuffleStage),同时向MapOutputTrackerMaster注册MapOutputs信息,且markMapStageJobAsFinished
调用submitWaitingChildStages(shuffleStage)进行下层Stages的处理,从而迭代处理最终处理到ResultTask,job结束,关键代码如下:
private def submitWaitingChildStages(parent: Stage) { ... val childStages = waitingStages.filter(_.parents.contains(parent)).toArray waitingStages --= childStages for (stage <- childStages.sortBy(_.firstJobId)) { submitStage(stage) } }
如果Task为ResultTask
改job的partitions都已返回,则markStageAsFinished(resultStage),并cleanupStateForJobAndIndependentStages(job),关键代码如下
for (stage <- stageIdToStage.get(stageId)) { if (runningStages.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) { shuffleIdToMapStage.remove(k) } if (waitingStages.contains(stage)) { logDebug("Removing stage %d from waiting set.".format(stageId)) waitingStages -= stage } if (failedStages.contains(stage)) { logDebug("Removing stage %d from failed set.".format(stageId)) failedStages -= stage } } // data structures based on StageId stageIdToStage -= stageId jobIdToStageIds -= job.jobId jobIdToActiveJob -= job.jobId activeJobs -= job
至此,用户编写的代码最终调用Spark分布式计算完毕。
相关文章推荐
- 【Spark2.0源码学习】-10.Task执行与回馈
- 【Spark2.0源码学习】-3.Endpoint模型介绍
- 【Spark2.0源码学习】-8.SparkContext与Application介绍
- 【Spark2.0源码学习】-9.Job提交与Task的拆分
- 【Spark2.0源码学习】-4.Master启动
- 【Spark2.0源码学习】-5.Worker启动
- 【Spark2.0源码学习】-7.Driver与DriverRunner
- 【Spark2.0源码学习】-6.Client启动
- 【Spark2.0源码学习】-3.Endpoint模型介绍
- 【Spark2.0源码学习】-9.Job提交与Task的拆分
- 【Spark2.0源码学习】-1.概述
- 【Spark2.0源码学习】-7.Driver与DriverRunner
- 【Spark2.0源码学习】-1.概述
- 【Spark2.0源码学习】-2.一切从脚本说起
- Spark2.0源码学习-Job提交与Task的拆分
- 【Spark2.0源码学习】-6.Client启动
- 【Spark2.0源码学习】-8.SparkContext与Application介绍
- 【Spark2.0源码学习】-9.Job提交与Task的拆分
- 【Spark2.0源码学习】-5.Worker启动
- 【Spark2.0源码学习】-2.一切从脚本说起