您的位置:首页 > 大数据

Spark Stage 的划分

2016-07-15 17:23 309 查看
     想了解Shuffle 的处理流程,首先要了解Spark是如何划分Stage的。下面,让我们看看 Spark 是如何根据RDD 的依赖关系来划分Stage。
     首先 我们必须要理解 Spark 中RDD的依赖关系.
     
     1.Rdd的依赖关系:
          Rdd的依赖有两种:
               1.宽依赖(Wide
Dependency)
               2.窄依赖(Narrow Dependency)
         以下图说明RDD的窄依赖和宽依赖
        


窄依赖
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为

一个父RDD的分区对应于一个子RDD的分区
两个父RDD的分区对应于一个子RDD 的分区。

如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作)则为第二类窄依赖

如果有多个父RDD的分区对应于同一个子RDD的分区不能称之为窄依赖?

宽窄依赖与容错性
Spark基于lineage的容错性是值,如果一个RDD出错,那么可以从它的所有父RDD重新计算所得,如果一个RDD仅有一个父RDD(即窄依赖),那么这种重新计算的代价会非常小。

Spark基于Checkpoint(物化)的容错机制何解?在上图中,宽依赖得到的结果(经历过Shuffle过程)是很昂贵的,因此,Spark将此结果物化到磁盘上了,以备后面使用

宽依赖
宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作,上图中的groupByKey和对输入未协同划分的join操作就是宽依赖。

窄依赖细说
窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到
父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。
Spark流水线优化:



宽依赖细说
变换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。

Spark中关于Dependency的源代码

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {

  def rdd: RDD[T]

}

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
  * 这里是说,窄依赖是指子RDD的每个Partition只依赖于父RDD很少部分的的分区,文档明显说的不对!窄依赖起码需要,父RDD的每个Partition只被一个子RDD的Partition依赖
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
 
def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd

}

/**
  * ShuffleDependency指的是,子RDD的partition部分依赖于父RDD的每个Partition 部分依赖被称为 ShuffleDependency。
  * 其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同
  * (mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属于自己的 partition 通过 HTTP fetch 得到)。
  */
/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
 *                   the default serializer, as specified by `spark.serializer` config option, will
 *                   be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)

  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName

  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName

  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =

    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(

    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

//下面两种 都是 在依赖的具体实现
/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = List(partitionId)

}

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)

  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {

    if (partitionId >= outStart && partitionId < outStart + length) {

      List(partitionId - outStart + inStart)

    } else {

      Nil
    }

  }

}

在了解了 Rdd 的依赖关系后,在来看 Spark是 如何根据依赖关系来划分Stage的:
当一个Job提交执行(通过RDD 的action 操作)的时候,最终导致了DAGScheduler中的submitjob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop(消息循环器),其中JobSubmitted源码如下: 包含了当前作业的信息。

/**
 * Submit an action job to the scheduler.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 *   partitions of the target RDD, e.g. for operations like first()
 * @param callSite where in the user program this job was called
 * @param resultHandler callback to pass each result to
 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
 *
 * @return a JobWaiter object that can be used to block until the job finishes executing
 *         or can be used to cancel the job.
 *
 * @throws IllegalArgumentException when partitions ids are illegal
 */
def submitJob[T, U](

    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {

  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length

  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

    throw new IllegalArgumentException(

      "Attempting to access a non-existent partition: " + p + ". " +

        "Total number of partitions: " + maxPartitions)

  }

  val jobId = nextJobId.getAndIncrement()

  if (partitions.size == 0) {

    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)

  }

  assert(partitions.size > 0)

  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

  eventProcessLoop.post(JobSubmitted(

    jobId, rdd, func2, partitions.toArray, callSite, waiter,

    SerializationUtils.clone(properties)))
  waiter

}

Note:返回JobWaiter用来检测 提交任务的完成情况

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId) =>

    dagScheduler.handleStageCancellation(stageId)

  case JobCancelled(jobId) =>

    dagScheduler.handleJobCancellation(jobId)

  case JobGroupCancelled(groupId) =>

    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>

    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>

    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId) =>

    dagScheduler.handleExecutorLost(execId, fetchFailed = false)

  case BeginEvent(task, taskInfo) =>

    dagScheduler.handleBeginEvent(task, taskInfo)

  case GettingResultEvent(taskInfo) =>

    dagScheduler.handleGetTaskResult(taskInfo)

  case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason, exception) =>

    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

  case ResubmitFailedStages =>

    dagScheduler.resubmitFailedStages()

}

eventProcessLoop 会不断的检查有没有消息要处理,所以当我们提交一个JobSubmitted 消息的时候 ,eventProcessLoop会就会调用

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

开始处理。

下面是 handleJobSubmitted 的具体实现:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {

  var finalStage: ResultStage = null
  try {

    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  } catch {

    case e: Exception =>

      logWarning("Creating new stage failed due to exception - job: " + jobId, e)

      listener.jobFailed(e)

      return
  }

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

  clearCacheLocs()

  logInfo("Got job %s (%s) with %d output partitions".format(

    job.jobId, callSite.shortForm, partitions.length))

  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

  logInfo("Parents of final stage: " + finalStage.parents)

  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()

  jobIdToActiveJob(jobId) = job

  activeJobs += job

  finalStage.setActiveJob(job)

  val stageIds = jobIdToStageIds(jobId).toArray

  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

  listenerBus.post(

    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

  submitStage(finalStage)

  submitWaitingStages()

}

我们重点关注  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)  的实现,关于stage 的划分的逻辑 就包含在里面

/**
 * Create a ResultStage associated with the provided jobId.
 */
private def newResultStage(

    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {

  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
  val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)

  stageIdToStage(id) = stage

  updateJobIdStageIdMaps(jobId, stage)

  stage

}

val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) 根据最后一个RDD (也就是 执行action 的RDD)从后面向前计算来计算 Stage

/**
 * Helper function to eliminate some code re-use when creating new stages.
 */
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {

  val parentStages = getParentStages(rdd, firstJobId)

  val id = nextStageId.getAndIncrement()

  (parentStages, id)

}

/**
 * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
 * the provided firstJobId.
 */
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

  val parents = new HashSet[Stage] // 存储parent stage
  val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
  val waitingForVisit = new Stack[RDD[_]]

  def visit(r: RDD[_]) {

    if (!visited(r)) {

      visited += r

      // Kind of ugly: need to register RDDs with the cache here since

      // we can't do it in its constructor because # of partitions is unknown

      for (dep <- r.dependencies) {

        dep match {

          case shufDep: ShuffleDependency[_, _, _] =>

            parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage

          case _ =>

            waitingForVisit.push(dep.rdd)

        }

      }

    }

  }
  waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
  while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。
    visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
  }

  parents.toList

}



下面我们以上面的图为例,来详细叙述Stage的划分
第一次循环:
1.RDD G传进来,将RDD G压栈.

waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd

2.此时的栈不空,将栈里面的RDD G弹出,作为参数传入visit函数内。

while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。
visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
}


3.RDD G没有被访问过,所以执行if中的代码

val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {

  if (!visited(r)) {

4.对RDD G进行处理,加入visited中。 

visited += r

5.以此处理RDD G依赖的parent RDD

// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {

  dep match {

    case shufDep: ShuffleDependency[_, _, _] =>

      parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
    case _ =>

      waitingForVisit.push(dep.rdd)

  }

}

上述过程就执行了一次循环了。

RDD G依赖两个RDD:
和RDD B的依赖关系是窄依赖,因此,并不会产生新的RDD,只是将RDD B 压入Stack栈中。
和RDD F的依赖关系是宽依赖,因此,RDD G 和RDD F会被划分成两个Stage,Shuffle依赖的关系信息保存在parents中,并且,RDD F所在的Stage 2是RDD G所在的Stage 3的parent Stage。



第二次循环:

与上述一样,RDD B所依赖的parent RDD是RDD A 之间是宽依赖关系,因此要创建一个新的Stage为Stage 1.而RDD F的parent RDD都是窄依赖,所以不产生新的Stage,均为Stage 2.

综上:

parents来记录Stage的创建,而不产生新的Stage的处理就是压入到Stack栈中。

详解getShuffleMapStage
getShuffleMapStage获得ShuffleDependency所依赖的Stage,如果没有,则创建新的Stage.

/**
 * Get or create a shuffle map stage for the given shuffle dependency's map side.
 */
private def getShuffleMapStage(

    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {

  //private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
  shuffleToMapStage.get(shuffleDep.shuffleId) match {

    case Some(stage) => stage   //如果已创建,则直接返回。
    case None =>

      // We are going to register ancestor shuffle dependencies
      //如果有Stage则直接使用,否则,根据Stage创建新的Parent Stage
      getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>

        shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)

      }

      // Then register current shuffleDep
      val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)

      shuffleToMapStage(shuffleDep.shuffleId) = stage

      stage

  }

}

newOrUsedStage也是生成Stage的,不过如果Stage已经存在,则直接使用。

/**
 * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
 * provided firstJobId.  If a stage for the shuffleId existed previously so that the shuffleId is
 * present in the MapOutputTracker, then the number and location of available outputs are
 * recovered from the MapOutputTracker
 */
private def newOrUsedShuffleStage(

    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {

  val rdd = shuffleDep.rdd

  val numTasks = rdd.partitions.length

  val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)

  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {

    //Stage已经被计算过,从MapOutputTracker中获取计算结果
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)

    (0 until locs.length).foreach { i =>

      if (locs(i) ne null) {

        // locs(i) will be null if missing
        stage.addOutputLoc(i, locs(i))

      }

    }

  } else {

    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")

    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)

  }

  stage

}

前面Stage被划分好了,现在就要提交了
HandleJobSubmitted 生成finalStage后,就会为该Job生成一个ActiveJob,来当前Job的一些信息。
 


val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

HandleJobSubmitted调用submitStage来提交Stage.

/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {

  val jobId = activeJobForStage(stage)

  if (jobId.isDefined) {

    logDebug("submitStage(" + stage + ")")

    //如果当前Stage不再等待其parent stage的返回,不是正在运行,且没有提示失败,那么就尝试提交它。
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

      val missing = getMissingParentStages(stage).sortBy(_.id)

      logDebug("missing: " + missing)

      if (missing.isEmpty) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        // submitMissingTasks就是没有父Stage了
        submitMissingTasks(stage, jobId.get)

      } else {

        //如果parent stage未完成,则递归提交它。
        for (parent <- missing) {

          submitStage(parent)

        }

        waitingStages += stage

      }

    }

  } else {

    abortStage(stage, "No active job for stage " + stage.id, None)

  }

}

补充说明:所谓的missing就是说要进行当前的计算了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 大数据