spark-0.8.0源码剖析-stage的建立--宽依赖和窄依赖
2013-12-07 22:21
537 查看
1在DAGScheduler中submitstage中调用此方法 每个stage又对应一个taskset,进行任务分配与调度执行
private def submitStage(stage: Stage) {
logDebug("submitStage(" + stage + ")")
if (!waiting(stage) && !running(stage) && !failed(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
waiting += stage
}
}
}
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
private def submitStage(stage: Stage) {
logDebug("submitStage(" + stage + ")")
if (!waiting(stage) && !running(stage) && !failed(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
waiting += stage
}
}
}
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
相关文章推荐
- [Spark源码剖析] DAGScheduler划分stage
- Spark源码剖析(八):stage划分原理与源码剖析
- spark-0.8.0源码剖析--主流程
- spark-0.8.0源码剖析--standalone模式集群并行和单机并行
- spark-0.8.0源码剖析storage
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]
- [Spark源码剖析] DAGScheduler提交stage
- spark-0.8.0源码剖析-分区Partitioner
- Spark内核源码深度剖析:宽依赖与窄依赖深度剖析
- Spark2.0.X源码深度剖析之 TaskScheduler之Task划分 —— 国内全网最新最全最具深度!!!
- [Spark内核] 第34课:Stage划分和Task最佳位置算法源码彻底解密
- Spark源码解读之RDD依赖Dependency
- spark源码之Job执行(1)stage划分与提交
- Spark2.2 任务调度机制schedule()源码剖析
- Spark源码分析之DAGScheduler以及stage的划分
- Spark源码剖析——SparkContext的初始化(八)_初始化管理器BlockManager
- (升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)
- 精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战
- Spark2.2 job触发流程原理剖析与源码分析
- Spark技术内幕:Stage划分及提交源码分析