Spark Stage 的划分
2016-07-15 17:23
309 查看
想了解Shuffle 的处理流程,首先要了解Spark是如何划分Stage的。下面,让我们看看 Spark 是如何根据RDD 的依赖关系来划分Stage。
首先 我们必须要理解 Spark 中RDD的依赖关系.
1.Rdd的依赖关系:
Rdd的依赖有两种:
1.宽依赖(Wide
Dependency)
2.窄依赖(Narrow Dependency)
以下图说明RDD的窄依赖和宽依赖
窄依赖
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为
一个父RDD的分区对应于一个子RDD的分区
两个父RDD的分区对应于一个子RDD 的分区。
如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作)则为第二类窄依赖
如果有多个父RDD的分区对应于同一个子RDD的分区不能称之为窄依赖?
宽窄依赖与容错性
Spark基于lineage的容错性是值,如果一个RDD出错,那么可以从它的所有父RDD重新计算所得,如果一个RDD仅有一个父RDD(即窄依赖),那么这种重新计算的代价会非常小。
Spark基于Checkpoint(物化)的容错机制何解?在上图中,宽依赖得到的结果(经历过Shuffle过程)是很昂贵的,因此,Spark将此结果物化到磁盘上了,以备后面使用
宽依赖
宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作,上图中的groupByKey和对输入未协同划分的join操作就是宽依赖。
窄依赖细说
窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到
父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。
Spark流水线优化:
宽依赖细说
变换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。
Spark中关于Dependency的源代码
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
* 这里是说,窄依赖是指子RDD的每个Partition只依赖于父RDD很少部分的的分区,文档明显说的不对!窄依赖起码需要,父RDD的每个Partition只被一个子RDD的Partition依赖
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/**
* ShuffleDependency指的是,子RDD的partition部分依赖于父RDD的每个Partition 部分依赖被称为 ShuffleDependency。
* 其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同
* (mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属于自己的 partition 通过 HTTP fetch 得到)。
*/
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
//下面两种 都是 在依赖的具体实现
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
在了解了 Rdd 的依赖关系后,在来看 Spark是 如何根据依赖关系来划分Stage的:
当一个Job提交执行(通过RDD 的action 操作)的时候,最终导致了DAGScheduler中的submitjob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop(消息循环器),其中JobSubmitted源码如下: 包含了当前作业的信息。
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
Note:返回JobWaiter用来检测 提交任务的完成情况
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
eventProcessLoop 会不断的检查有没有消息要处理,所以当我们提交一个JobSubmitted 消息的时候 ,eventProcessLoop会就会调用
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
开始处理。
下面是 handleJobSubmitted 的具体实现:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
我们重点关注 finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) 的实现,关于stage 的划分的逻辑 就包含在里面
/**
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) 根据最后一个RDD (也就是 执行action 的RDD)从后面向前计算来计算 Stage
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage] // 存储parent stage
val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。
visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
}
parents.toList
}
下面我们以上面的图为例,来详细叙述Stage的划分
第一次循环:
1.RDD G传进来,将RDD G压栈.
waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
2.此时的栈不空,将栈里面的RDD G弹出,作为参数传入visit函数内。
3.RDD G没有被访问过,所以执行if中的代码
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
4.对RDD G进行处理,加入visited中。
visited += r
5.以此处理RDD G依赖的parent RDD
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
case _ =>
waitingForVisit.push(dep.rdd)
}
}
上述过程就执行了一次循环了。
RDD G依赖两个RDD:
和RDD B的依赖关系是窄依赖,因此,并不会产生新的RDD,只是将RDD B 压入Stack栈中。
和RDD F的依赖关系是宽依赖,因此,RDD G 和RDD F会被划分成两个Stage,Shuffle依赖的关系信息保存在parents中,并且,RDD F所在的Stage 2是RDD G所在的Stage 3的parent Stage。
第二次循环:
与上述一样,RDD B所依赖的parent RDD是RDD A 之间是宽依赖关系,因此要创建一个新的Stage为Stage 1.而RDD F的parent RDD都是窄依赖,所以不产生新的Stage,均为Stage 2.
综上:
parents来记录Stage的创建,而不产生新的Stage的处理就是压入到Stack栈中。
详解getShuffleMapStage
getShuffleMapStage获得ShuffleDependency所依赖的Stage,如果没有,则创建新的Stage.
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
//private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage //如果已创建,则直接返回。
case None =>
// We are going to register ancestor shuffle dependencies
//如果有Stage则直接使用,否则,根据Stage创建新的Parent Stage
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
newOrUsedStage也是生成Stage的,不过如果Stage已经存在,则直接使用。
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
//Stage已经被计算过,从MapOutputTracker中获取计算结果
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
前面Stage被划分好了,现在就要提交了
HandleJobSubmitted 生成finalStage后,就会为该Job生成一个ActiveJob,来当前Job的一些信息。
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
HandleJobSubmitted调用submitStage来提交Stage.
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
//如果当前Stage不再等待其parent stage的返回,不是正在运行,且没有提示失败,那么就尝试提交它。
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// submitMissingTasks就是没有父Stage了
submitMissingTasks(stage, jobId.get)
} else {
//如果parent stage未完成,则递归提交它。
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
补充说明:所谓的missing就是说要进行当前的计算了。
首先 我们必须要理解 Spark 中RDD的依赖关系.
1.Rdd的依赖关系:
Rdd的依赖有两种:
1.宽依赖(Wide
Dependency)
2.窄依赖(Narrow Dependency)
以下图说明RDD的窄依赖和宽依赖
窄依赖
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为
一个父RDD的分区对应于一个子RDD的分区
两个父RDD的分区对应于一个子RDD 的分区。
如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作)则为第二类窄依赖
如果有多个父RDD的分区对应于同一个子RDD的分区不能称之为窄依赖?
宽窄依赖与容错性
Spark基于lineage的容错性是值,如果一个RDD出错,那么可以从它的所有父RDD重新计算所得,如果一个RDD仅有一个父RDD(即窄依赖),那么这种重新计算的代价会非常小。
Spark基于Checkpoint(物化)的容错机制何解?在上图中,宽依赖得到的结果(经历过Shuffle过程)是很昂贵的,因此,Spark将此结果物化到磁盘上了,以备后面使用
宽依赖
宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作,上图中的groupByKey和对输入未协同划分的join操作就是宽依赖。
窄依赖细说
窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到
父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。
Spark流水线优化:
宽依赖细说
变换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。
Spark中关于Dependency的源代码
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
* 这里是说,窄依赖是指子RDD的每个Partition只依赖于父RDD很少部分的的分区,文档明显说的不对!窄依赖起码需要,父RDD的每个Partition只被一个子RDD的Partition依赖
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/**
* ShuffleDependency指的是,子RDD的partition部分依赖于父RDD的每个Partition 部分依赖被称为 ShuffleDependency。
* 其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同
* (mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属于自己的 partition 通过 HTTP fetch 得到)。
*/
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
//下面两种 都是 在依赖的具体实现
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
在了解了 Rdd 的依赖关系后,在来看 Spark是 如何根据依赖关系来划分Stage的:
当一个Job提交执行(通过RDD 的action 操作)的时候,最终导致了DAGScheduler中的submitjob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop(消息循环器),其中JobSubmitted源码如下: 包含了当前作业的信息。
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
Note:返回JobWaiter用来检测 提交任务的完成情况
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
eventProcessLoop 会不断的检查有没有消息要处理,所以当我们提交一个JobSubmitted 消息的时候 ,eventProcessLoop会就会调用
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
开始处理。
下面是 handleJobSubmitted 的具体实现:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
我们重点关注 finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) 的实现,关于stage 的划分的逻辑 就包含在里面
/**
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) 根据最后一个RDD (也就是 执行action 的RDD)从后面向前计算来计算 Stage
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage] // 存储parent stage
val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。
visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
}
parents.toList
}
下面我们以上面的图为例,来详细叙述Stage的划分
第一次循环:
1.RDD G传进来,将RDD G压栈.
waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
2.此时的栈不空,将栈里面的RDD G弹出,作为参数传入visit函数内。
while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。 visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage }
3.RDD G没有被访问过,所以执行if中的代码
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
4.对RDD G进行处理,加入visited中。
visited += r
5.以此处理RDD G依赖的parent RDD
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
case _ =>
waitingForVisit.push(dep.rdd)
}
}
上述过程就执行了一次循环了。
RDD G依赖两个RDD:
和RDD B的依赖关系是窄依赖,因此,并不会产生新的RDD,只是将RDD B 压入Stack栈中。
和RDD F的依赖关系是宽依赖,因此,RDD G 和RDD F会被划分成两个Stage,Shuffle依赖的关系信息保存在parents中,并且,RDD F所在的Stage 2是RDD G所在的Stage 3的parent Stage。
第二次循环:
与上述一样,RDD B所依赖的parent RDD是RDD A 之间是宽依赖关系,因此要创建一个新的Stage为Stage 1.而RDD F的parent RDD都是窄依赖,所以不产生新的Stage,均为Stage 2.
综上:
parents来记录Stage的创建,而不产生新的Stage的处理就是压入到Stack栈中。
详解getShuffleMapStage
getShuffleMapStage获得ShuffleDependency所依赖的Stage,如果没有,则创建新的Stage.
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
//private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage //如果已创建,则直接返回。
case None =>
// We are going to register ancestor shuffle dependencies
//如果有Stage则直接使用,否则,根据Stage创建新的Parent Stage
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
newOrUsedStage也是生成Stage的,不过如果Stage已经存在,则直接使用。
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
//Stage已经被计算过,从MapOutputTracker中获取计算结果
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
前面Stage被划分好了,现在就要提交了
HandleJobSubmitted 生成finalStage后,就会为该Job生成一个ActiveJob,来当前Job的一些信息。
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
HandleJobSubmitted调用submitStage来提交Stage.
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
//如果当前Stage不再等待其parent stage的返回,不是正在运行,且没有提示失败,那么就尝试提交它。
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// submitMissingTasks就是没有父Stage了
submitMissingTasks(stage, jobId.get)
} else {
//如果parent stage未完成,则递归提交它。
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
补充说明:所谓的missing就是说要进行当前的计算了。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 康诺云推出三款智能硬件产品,为健康管理业务搭建数据池
- MySQL中使用innobackupex、xtrabackup进行大数据的备份和还原教程
- php+ajax导入大数据时产生的问题处理
- C# 大数据导出word的假死报错的处理方法
- 用Python实现协同过滤的教程
- Python利用多进程将大量数据放入有限内存的教程
- eclipse 开发 spark Streaming wordCount
- mongodb常遇到的错误。
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Stack数据结构的特点后进先出的应用:大数据运算
- Spark机器学习(一) -- Machine Learning Library (MLlib)