[Spark源代码剖析] DAGScheduler提交stage
2017-06-15 19:41
281 查看
转载请标明出处:http://blog.csdn.net/bigbigdata/article/details/47310657
DAGScheduler通过调用submitStage来提交stage。实现例如以下:
submitStage先调用
上面提到,若stageX存在未提交的父stages。则先提交父stages;那么,假设stageX没有未提交的父stage呢(比方。包括从HDFS读取数据生成HadoopRDD的那个stage是没有父stage的)?
这时会调用
这个函数的实现比較长。以下分段说明。
这么做(没有直接提交全部tasks)的原因是,stage中某个task运行失败其它运行成功的时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition
以上,介绍了提交stage和提交tasks的实现。本文若有纰漏,请批评指正。
DAGScheduler通过调用submitStage来提交stage。实现例如以下:
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //< 获取该stage未提交的父stages,并按stage id从小到大排序 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //< 若无未提交的父stage, 则提交该stage相应的tasks submitMissingTasks(stage, jobId.get) } else { //< 若存在未提交的父stage, 依次提交全部父stage (若父stage也存在未提交的父stage, 则提交之, 依次类推); 并把该stage增加到等待stage队列中 for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id) } }
submitStage先调用
getMissingParentStages来获取參数stageX(这里为了区分,取名为stageX)是否有未提交的父stages,若有。则依次递归(按stage id从小到大排列。也就是stage是从后往前提交的)提交父stages,并将stageX增加到
waitingStages: HashSet[Stage]中。对于要依次提交的父stage。也是如此。
getMissingParentStages与DAGScheduler划分stage中介绍的
getParentStages有点像,但不同的是不再须要划分stage,并对每一个stage的状态做了推断,源代码及凝视例如以下:
//< 以參数stage为起点,向前遍历全部stage,推断stage是否为未提交,若使则增加missing中 private def getMissingParentStages(stage: Stage): List[Stage] = { //< 未提交的stage val missing = new HashSet[Stage] //< 存储已经被訪问到得RDD val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd if (getCacheLocs(rdd).contains(Nil)) { for (dep <- rdd.dependencies) { dep match { //< 若为宽依赖。生成新的stage case shufDep: ShuffleDependency[_, _, _] => //< 这里调用getShuffleMapStage不像在getParentStages时须要划分stage。而是直接依据shufDep.shuffleId获取相应的ShuffleMapStage val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { // 若stage得状态为available则为未提交stage missing += mapStage } //< 若为窄依赖,那就属于同一个stage。并将依赖的RDD放入waitingForVisit中,以可以在以下的while中继续向上visit。直至遍历了整个DAG图 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList }
上面提到,若stageX存在未提交的父stages。则先提交父stages;那么,假设stageX没有未提交的父stage呢(比方。包括从HDFS读取数据生成HadoopRDD的那个stage是没有父stage的)?
这时会调用
submitMissingTasks(stage, jobId.get),參数就是stageX及其相应的jobId.get。这个函数便是我们时常在其它文章或书籍中看到的将stage与taskSet相应起来,然后DAGScheduler将taskSet提交给TaskScheduler去运行的实施者。
这个函数的实现比較长。以下分段说明。
Step1: 得到RDD中须要计算的partition
对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;对于Result类型的Final Stage。则推断计算Job中该partition是否已经计算完毕。这么做(没有直接提交全部tasks)的原因是,stage中某个task运行失败其它运行成功的时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition
private def submitMissingTasks(stage: Stage, jobId: Int) { stage.pendingTasks.clear() //< 首先得到RDD中须要计算的partition //< 对于Shuffle类型的stage,须要推断stage中是否缓存了该结果; //< 对于Result类型的Final Stage,则推断计算Job中该partition是否已经计算完毕 //< 这么做的原因是。stage中某个task运行失败其它运行成功地时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition val partitionsToCompute: Seq[Int] = { stage match { case stage: ShuffleMapStage => (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty) case stage: ResultStage => val job = stage.resultOfJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } }
Step2: 序列化task的binary
Executor可以通过广播变量得到它。每一个task运行的时候首先会反序列化var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => //< 对于ShuffleMapTask,将rdd及其依赖关系序列化。在Executor运行task之前会反序列化 closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() //< 对于ResultTask,对rdd及要在每一个partition上运行的func case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array() } //< 将序列化好的信息广播给全部的executor taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}") runningStages -= stage return }
Step3: 为每一个须要计算的partiton生成一个task
ShuffleMapStage相应的task全是ShuffleMapTask; ResultStage相应的全是ResultTask。task继承Serializable,要确保task是可序列化的。val tasks: Seq[Task[_]] = stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) //< RDD相应的partition val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, taskBinary, part, locs) } case stage: ResultStage => val job = stage.resultOfJob.get //< id为输出分区索引,表示reducerID partitionsToCompute.map { id => val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) new ResultTask(stage.id, taskBinary, part, locs, id) } }
Step4: 提交tasks
先用tasks来初始化一个TaskSet对象。再调用TaskScheduler.submitTasks提交stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) //< 提交TaskSet至TaskScheduler taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) //< 记录stage提交task的时间 stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else {
以上,介绍了提交stage和提交tasks的实现。本文若有纰漏,请批评指正。
相关文章推荐
- [Spark源代码剖析] DAGScheduler划分stage
- [Spark源码剖析] DAGScheduler提交stage
- [Spark源码剖析] DAGScheduler划分stage
- [spark] DAGScheduler 提交stage源码解析
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]
- Spark技术内幕:Stage划分及提交源代码分析
- spark学习-DAGScheduler的stage划分算法
- spark源码之Job执行(1)stage划分与提交
- Spark内核源码深度剖析:基于Yarn的两种提交模式深度剖析
- Spark技术内幕:Stage划分及提交源码分析
- Spark源码走读(三) —— Stage的划分和提交
- Spark学习之8:Stage提交到Task执行
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- spark源码之Job执行(1)stage划分与提交
- Spark源码剖析——SparkContext的初始化(六)_创建和启动DAGScheduler
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- Spark源码分析之四:Stage提交
- spark源码之Job执行(1)stage划分与提交