您的位置:首页 > 其它

spark-0.8.0源码剖析-stage的建立--宽依赖和窄依赖

2013-12-07 22:21 537 查看
1在DAGScheduler中submitstage中调用此方法 每个stage又对应一个taskset,进行任务分配与调度执行

  private def submitStage(stage: Stage) {

    logDebug("submitStage(" + stage + ")")

    if (!waiting(stage) && !running(stage) && !failed(stage)) {

      val missing = getMissingParentStages(stage).sortBy(_.id)

      logDebug("missing: " + missing)

      if (missing == Nil) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        submitMissingTasks(stage)

        running += stage

      } else {

        for (parent <- missing) {

          submitStage(parent)

        }

        waiting += stage

      }

    }

  }

 private def getMissingParentStages(stage: Stage): List[Stage] = {

    val missing = new HashSet[Stage]

    val visited = new HashSet[RDD[_]]

    def visit(rdd: RDD[_]) {

      if (!visited(rdd)) {

        visited += rdd

        if (getCacheLocs(rdd).contains(Nil)) {

          for (dep <- rdd.dependencies) {

            dep match {

              case shufDep: ShuffleDependency[_,_] =>

                val mapStage = getShuffleMapStage(shufDep, stage.jobId)

                if (!mapStage.isAvailable) {

                  missing += mapStage

                }

              case narrowDep: NarrowDependency[_] =>

                visit(narrowDep.rdd)

            }

          }

        }

      }

    }

    visit(stage.rdd)

    missing.toList

  }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: