第34课: Stage划分和Task最佳位置算法源码彻底解密
2017-05-29 07:51
417 查看
第34课: Stage划分和Task最佳位置算法源码彻底解密
Spark作业调度的时候,Job提交过程中Stage 划分的算法以及Task最佳位置的算法。Stage的划分是DAGScheduler工作的核心,涉及作业在集群中怎么运行,Task最佳位置数据本地性的内容。Spark 算子的构建是链式的,涉及到怎么进行计算,首先是划分Stage,Stage划分以后才是计算的本身;分布式大数据系统追求最大化的数据本地性,数据本地性是指数据进行计算的时候,数据就在内存中,甚至不用计算就直接获得结果。
Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
Stage划分就是根据宽依赖,什么时候产生宽依赖呢?例如reducByKey、groupByKey等等;
我们从RDD的collect()方法开始,collect算子是一个Action,会触发job的运行:
RDD.scala的collect方法源码,调用了runJob方法:
1. def collect(): Array[T] = withScope {
2. val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)
3. Array.concat(results: _*)
4. }
进入SparkContext.scala的runJob方法:
1. def runJob[T, U: ClassTag](rdd: RDD[T], func:Iterator[T] => U): Array[U] = {
2. runJob(rdd, func, 0 untilrdd.partitions.length)
3. }
继续重载runJob方法:
1. def runJob[T, U: ClassTag](
2. rdd: RDD[T],
3. func: Iterator[T] => U,
4. partitions: Seq[Int]): Array[U] = {
5. val cleanedFunc = clean(func)
6. runJob(rdd, (ctx: TaskContext, it:Iterator[T]) => cleanedFunc(it), partitions)
7. }
继续重载runJob方法:
1. defrunJob[T, U: ClassTag](
2. rdd: RDD[T],
3. processPartition: Iterator[T] => U,
4. resultHandler: (Int, U) => Unit)
5. {
6. val processFunc = (context: TaskContext,iter: Iterator[T]) => processPartition(iter)
7. runJob[T, U](rdd, processFunc, 0 untilrdd.partitions.length, resultHandler)
8. }
继续重载runJob方法:
1. def runJob[T, U: ClassTag](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. resultHandler: (Int, U) => Unit): Unit= {
6. if (stopped.get()) {
7. throw newIllegalStateException("SparkContext has been shutdown")
8. }
9. val callSite = getCallSite
10. val cleanedFunc = clean(func)
11. logInfo("Starting job: " +callSite.shortForm)
12. if(conf.getBoolean("spark.logLineage", false)) {
13. logInfo("RDD's recursivedependencies:\n" + rdd.toDebugString)
14. }
15. dagScheduler.runJob(rdd, cleanedFunc,partitions, callSite, resultHandler, localProperties.get)
16. progressBar.foreach(_.finishAll())
17. rdd.doCheckpoint()
18. }
进入DAGScheduler.scala的runJob方法:
1. def runJob[T, U](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. callSite: CallSite,
6. resultHandler: (Int, U) => Unit,
7. properties: Properties): Unit = {
8. val start = System.nanoTime
9. val waiter = submitJob(rdd, func,partitions, callSite, resultHandler, properties)
10. // Note: Do not call Await.ready(future)because that calls `scala.concurrent.blocking`,
11. // which causes concurrent SQL executionsto fail if a fork-join pool is used. Note that
12. // due to idiosyncrasies in Scala,`awaitPermission` is not actually used anywhere so it's
13. // safe to pass in null here. For moredetail, see SPARK-13747.
14. val awaitPermission =null.asInstanceOf[scala.concurrent.CanAwait]
15. waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
16. waiter.completionFuture.value.get match {
17. case scala.util.Success(_) =>
18. logInfo("Job %d finished: %s, took %fs".format
19. (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
20. case scala.util.Failure(exception) =>
21. logInfo("Job %d failed: %s, took%f s".format
22. (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
23. // SPARK-8644: Include user stack tracein exceptions coming from DAGScheduler.
24. val callerStackTrace =Thread.currentThread().getStackTrace.tail
25. exception.setStackTrace(exception.getStackTrace++ callerStackTrace)
26. throw exception
27. }
28. }
DAGScheduler runJob的时候就交给了submitJob,waiter等待作业调度的结果,作业成功或者失败打印相关的日志信息。进入DAGScheduler的 submitJob方法:
1. defsubmitJob[T, U](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. callSite: CallSite,
6. resultHandler: (Int, U) => Unit,
7. properties: Properties): JobWaiter[U] = {
8. // Check to make sure we are not launchinga task on a partition that does not exist.
9. val maxPartitions = rdd.partitions.length
10. partitions.find(p => p >=maxPartitions || p < 0).foreach { p =>
11. throw new IllegalArgumentException(
12. "Attempting to access anon-existent partition: " + p + ". " +
13. "Total number of partitions:" + maxPartitions)
14. }
15.
16. val jobId = nextJobId.getAndIncrement()
17. if (partitions.size == 0) {
18. // Return immediately if the job isrunning 0 tasks
19. return new JobWaiter[U](this, jobId, 0,resultHandler)
20. }
21.
22. assert(partitions.size > 0)
23. val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) => _]
24. val waiter = new JobWaiter(this, jobId,partitions.size, resultHandler)
25. eventProcessLoop.post(JobSubmitted(
26. jobId, rdd, func2, partitions.toArray,callSite, waiter,
27. SerializationUtils.clone(properties)))
28. waiter
29. }
submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob方法关键的代码是eventProcessLoop.post(JobSubmitted的JobSubmitted,
JobSubmitted是一个case class,而不是一个caseobject,因为application中有很多的job,不同的job的JobSubmitted实例不一样,如果使用caseobject,case object展示的内容是一样的,就像全局唯一变量,而现在我们需要不同的实例,因此使用case class。JobSubmitted 的成员finalRDD是最后一个RDD。
由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个caseclass JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:
1. private[scheduler]case class JobSubmitted(
2. jobId: Int,
3. finalRDD: RDD[_],
4. func: (TaskContext, Iterator[_]) => _,
5. partitions: Array[Int],
6. callSite: CallSite,
7. listener: JobListener,
8. properties: Properties = null)
9. extends DAGSchedulerEvent
JobSubmitted是private[scheduler]级别的,用户不可直接调用它。JobSubmitted封装了 jobId、封装了最后一个finalRDD,封装了具体对RDD操作的函数func,封装了有哪些partitions要进行计算,也封装了作业监听器listener、状态等等内容。
DAGScheduler的 submitJob方法关键代码eventProcessLoop.post(JobSubmitted中,将JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一个线程中发一个消息。eventProcessLoop源码如下:
1. private[scheduler] val eventProcessLoop = newDAGSchedulerEventProcessLoop(this)
看一下DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop继承至EventLoop:
1. private[scheduler] classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
2. extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging{
EventLoop中开启了一个线程eventThread,线程设置成Daemon后台运行的方式; run方法里面调用了onReceive(event)方法。post方法就是往eventQueue.put事件队列中放入一个元素。EventLoop的源码如下:
1. private[spark] abstract classEventLoop[E](name: String) extends Logging {
2.
3. private val eventQueue: BlockingQueue[E] =new LinkedBlockingDeque[E]()
4.
5. private val stopped = newAtomicBoolean(false)
6.
7. private val eventThread = new Thread(name) {
8. setDaemon(true)
9.
10. override def run(): Unit = {
11. try {
12. while (!stopped.get) {
13. val event = eventQueue.take()
14. try {
15. onReceive(event)
16. } catch {
17. case NonFatal(e) =>
18. try {
19. onError(e)
20. } catch {
21. case NonFatal(e) =>logError("Unexpected error in " + name, e)
22. }
23. }
24. }
25. } catch {
26. case ie: InterruptedException => //exit even if eventQueue is not empty
27. case NonFatal(e) =>logError("Unexpected error in " + name, e)
28. }
29. }
30.
31. }
32.
33. def start(): Unit = {
34. if (stopped.get) {
35. throw new IllegalStateException(name +" has already been stopped")
36. }
37. // Call onStart before starting the eventthread to make sure it happens before onReceive
38. onStart()
39. eventThread.start()
40. }
41. ……
42. def post(event: E): Unit = {
43. eventQueue.put(event)
44. }
eventProcessLoop是DAGSchedulerEventProcessLoo实例, DAGSchedulerEventProcessLoop继承至EventLoop,具体实现onReceive方法,onReceive方法又调用doOnReceive方法。
doOnReceive收到消息开始处理:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
4.
5. case MapStageSubmitted(jobId, dependency,callSite, listener, properties) =>
6. dagScheduler.handleMapStageSubmitted(jobId,dependency, callSite, listener, properties)
7.
8. case StageCancelled(stageId) =>
9. dagScheduler.handleStageCancellation(stageId)
10.
11. case JobCancelled(jobId) =>
12. dagScheduler.handleJobCancellation(jobId)
13.
14. case JobGroupCancelled(groupId) =>
15. dagScheduler.handleJobGroupCancelled(groupId)
16.
17. case AllJobsCancelled =>
18. dagScheduler.doCancelAllJobs()
19.
20. case ExecutorAdded(execId, host) =>
21. dagScheduler.handleExecutorAdded(execId,host)
22.
23. case ExecutorLost(execId, reason) =>
24. val filesLost = reason match {
25. case SlaveLost(_, true) => true
26. case _ => false
27. }
28. dagScheduler.handleExecutorLost(execId,filesLost)
29.
30. case BeginEvent(task, taskInfo) =>
31. dagScheduler.handleBeginEvent(task,taskInfo)
32.
33. case GettingResultEvent(taskInfo) =>
34. dagScheduler.handleGetTaskResult(taskInfo)
35.
36. case completion: CompletionEvent =>
37. dagScheduler.handleTaskCompletion(completion)
38.
39. case TaskSetFailed(taskSet, reason,exception) =>
40. dagScheduler.handleTaskSetFailed(taskSet,reason, exception)
41.
42. case ResubmitFailedStages =>
43. dagScheduler.resubmitFailedStages()
44. }
总结一下:EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到队列中,在不断的循环,所以可以拿到这个消息,转过来回调方法onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。
关于线程的异步通信:为什么要新开辟一条线程?例如在DAGScheduler发送消息为何不直接调用doOnReceive,而需要一个消息循环器。 DAGScheduler这里自己给自己发消息,不管是自己发消息,还是别人发消息,都采用一条线程去处理的话,两者处理的逻辑是一致的,扩展性就非常好。使用消息循环器,就能统一处理所有的消息。保证处理的业务逻辑都是一致的。
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。
在doOnReceive中通过模式匹配的方式把执行路由到caseJobSubmitted,调用dagScheduler.handleJobSubmitted方法:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
DAGScheduler 的handleJobSubmitted源码如下:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2. finalRDD: RDD[_],
3. func: (TaskContext, Iterator[_]) => _,
4. partitions: Array[Int],
5. callSite: CallSite,
6. listener: JobListener,
7. properties: Properties) {
8. var finalStage: ResultStage = null
9. try {
10. // New stage creation may throw anexception if, for example, jobs are run on a
11. // HadoopRDD whose underlying HDFS fileshave been deleted.
12. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
13. } catch {
14. case e: Exception =>
15. logWarning("Creating new stagefailed due to exception - job: " + jobId, e)
16. listener.jobFailed(e)
17. return
18. }
19.
20. val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
21. clearCacheLocs()
22. logInfo("Got job %s (%s) with %doutput partitions".format(
23. job.jobId, callSite.shortForm,partitions.length))
24. logInfo("Final stage: " +finalStage + " (" + finalStage.name + ")")
25. logInfo("Parents of final stage:" + finalStage.parents)
26. logInfo("Missing parents: " +getMissingParentStages(finalStage))
27.
28. val jobSubmissionTime =clock.getTimeMillis()
29. jobIdToActiveJob(jobId) = job
30. activeJobs += job
31. finalStage.setActiveJob(job)
32. val stageIds =jobIdToStageIds(jobId).toArray
33. val stageInfos = stageIds.flatMap(id =>stageIdToStage.get(id).map(_.latestInfo))
34. listenerBus.post(
35. SparkListenerJobStart(job.jobId,jobSubmissionTime, stageInfos, properties))
36. submitStage(finalStage)
37. }
Stage开始:每次调用一个runJob就是产生一个job;finalStage是一个ResultStage,最后一个Stage是ResultStage,前面的Stage是ShuffleMapStage。
在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条。
通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func, 分区partitions, jobId, callSite等内容。创建过程中可能捕获的异常,例如在Hadoop上底层的hdfs文件被删除了或者被修改了就出现异常。
createResultStage的源码如下:
1. private def createResultStage(
2. rdd: RDD[_],
3. func: (TaskContext, Iterator[_]) => _,
4. partitions: Array[Int],
5. jobId: Int,
6. callSite: CallSite): ResultStage = {
7. val parents = getOrCreateParentStages(rdd,jobId)
8. val id = nextStageId.getAndIncrement()
9. val stage = new ResultStage(id, rdd, func,partitions, parents, jobId, callSite)
10. stageIdToStage(id) = stage
11. updateJobIdStageIdMaps(jobId, stage)
12. stage
13. }
createResultStage中,基于我们的作业ID,作业ID(jobId)是作为第三个参数传进来的,创建了ResultStage。
createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,新的Stages将提供 firstJobId创建。getOrCreateParentStages源码:
1. private def getOrCreateParentStages(rdd:RDD[_], firstJobId: Int): List[Stage] = {
2. getShuffleDependencies(rdd).map {shuffleDep =>
3. getOrCreateShuffleMapStage(shuffleDep,firstJobId)
4. }.toList
5. }
getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父亲节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。例如,如果C shuffle依赖于B,B shuffle依赖A:A <-- B <-- C。在RDD C中调用getShuffleDependencies函数,将只返回 B <-- C的依赖。 此功能可用作单元测试。getShuffleDependencies源码如下:
1. private[scheduler] defgetShuffleDependencies(
2. rdd: RDD[_]):HashSet[ShuffleDependency[_, _, _]] = {
3. val parents = newHashSet[ShuffleDependency[_, _, _]]
4. val visited = new HashSet[RDD[_]]
5. val waitingForVisit = new Stack[RDD[_]]
6. waitingForVisit.push(rdd)
7. while (waitingForVisit.nonEmpty) {
8. val toVisit = waitingForVisit.pop()
9. if (!visited(toVisit)) {
10. visited += toVisit
11. toVisit.dependencies.foreach {
12. case shuffleDep: ShuffleDependency[_,_, _] =>
13. parents += shuffleDep
14. case dependency =>
15. waitingForVisit.push(dependency.rdd)
16. }
17. }
18. }
19. parents
20. }
getOrCreateParentStages方法中通过 getShuffleDependencies(rdd).map进行map转换时候调用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那就获取一个shuffle map stage,否则,如果shuffle map stage还不存在,除了即将进行计算的更远祖先节点的shuffle map stage,将创建一个自己的shuffle map stage。
getOrCreateShuffleMapStage源码如下:
1. privatedef getOrCreateShuffleMapStage(
2. shuffleDep: ShuffleDependency[_, _, _],
3. firstJobId: Int): ShuffleMapStage = {
4. shuffleIdToMapStage.get(shuffleDep.shuffleId)match {
5. case Some(stage) =>
6. stage
7.
8. case None =>
9. // Create stages for all missingancestor shuffle dependencies.
10. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach{ dep =>
11. // Even thoughgetMissingAncestorShuffleDependencies only returns shuffle dependencies
12. // that were not already inshuffleIdToMapStage, it's possible that by the time we
13. // get to a particular dependency inthe foreach loop, it's been added to
14. // shuffleIdToMapStage by the stagecreation process for an earlier dependency. See
15. // SPARK-13902 for more information.
16. if(!shuffleIdToMapStage.contains(dep.shuffleId)) {
17. createShuffleMapStage(dep,firstJobId)
18. }
19. }
20. // Finally, create a stage for thegiven shuffle dependency.
21. createShuffleMapStage(shuffleDep,firstJobId)
22. }
23. }
getOrCreateShuffleMapStage方法中:
l 如果根据shuffleId模式匹配获取到Stage,就返回stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage, 数据数据shuffleIdToMapStage:从shuffle dependency ID到ShuffleMapStage的映射关系,将生成的依赖关系的数据映射。只包含当前运行作业的映射数据,当shuffle stage作业完成时, shuffle 映射数据将被删除,唯一的记录 shuffle的数据将记录在MapOutputTracker中。
l 如果根据shuffleId模式匹配没有获取到Stage,调用getMissingAncestorShuffleDependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的stages。
getMissingAncestorShuffleDependencies查找shuffle依赖中还没有进行shuffleToMapStage 注册的祖先节点。getMissingAncestorShuffleDependencies源码如下:
1. private defgetMissingAncestorShuffleDependencies(
2. rdd: RDD[_]): Stack[ShuffleDependency[_,_, _]] = {
3. val ancestors = newStack[ShuffleDependency[_, _, _]]
4. val visited = new HashSet[RDD[_]]
5. // We are manually maintaining a stack hereto prevent StackOverflowError
6. // caused by recursively visiting
7. val waitingForVisit = new Stack[RDD[_]]
8. waitingForVisit.push(rdd)
9. while (waitingForVisit.nonEmpty) {
10. val toVisit = waitingForVisit.pop()
11. if (!visited(toVisit)) {
12. visited += toVisit
13. getShuffleDependencies(toVisit).foreach{ shuffleDep =>
14. if(!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
15. ancestors.push(shuffleDep)
16. waitingForVisit.push(shuffleDep.rdd)
17. } // Otherwise, the dependency andits ancestors have already been registered.
18. }
19. }
20. }
21. ancestors
22. }
createShuffleMapStage创建一个ShuffleMapStage,根据shuffle 依赖的分区。如果一个以前运行Stage产生相同的 shuffle 数据,此函数将从以前的shuffle的数据中赋值输出的位置信息,避免不必要的重新生成数据。
createShuffleMapStage源码如下:
1. def createShuffleMapStage(shuffleDep:ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
2. val rdd = shuffleDep.rdd
3. val numTasks = rdd.partitions.length
4. val parents = getOrCreateParentStages(rdd,jobId)
5. val id = nextStageId.getAndIncrement()
6. val stage = new ShuffleMapStage(id, rdd,numTasks, parents, jobId, rdd.creationSite, shuffleDep)
7.
8. stageIdToStage(id) = stage
9. shuffleIdToMapStage(shuffleDep.shuffleId) =stage
10. updateJobIdStageIdMaps(jobId, stage)
11.
12. if(mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
13. // A previously run stage generatedpartitions for this shuffle, so for each output
14. // that's still available, copyinformation about that output location to the new stage
15. // (so we don't unnecessarily re-computethat data).
16. val serLocs =mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
17. val locs =MapOutputTracker.deserializeMapStatuses(serLocs)
18. (0 until locs.length).foreach { i =>
19. if (locs(i) ne null) {
20. // locs(i) will be null if missing
21. stage.addOutputLoc(i, locs(i))
22. }
23. }
24. } else {
25. // Kind of ugly: need to register RDDswith the cache and map output tracker here
26. // since we can't do it in the RDDconstructor because # of partitions is unknown
27. logInfo("Registering RDD " +rdd.id + " (" + rdd.getCreationSite + ")")
28. mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.length)
29. }
30. stage
31. }
回到handleJobSubmitted,创建finalStage以后将提交finalStage:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2. ......
3. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4. ......
5. submitStage(finalStage)
6. }
submitStage提交 stage,, 首先递归提交即将计算的父stage。submitStage的源码如下:
1. privatedef submitStage(stage: Stage) {
2. val jobId = activeJobForStage(stage)
3. if (jobId.isDefined) {
4. logDebug("submitStage(" + stage+ ")")
5. if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6. val missing =getMissingParentStages(stage).sortBy(_.id)
7. logDebug("missing: " +missing)
8. if (missing.isEmpty) {
9. logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10. submitMissingTasks(stage, jobId.get)
11. } else {
12. for (parent <- missing) {
13. submitStage(parent)
14. }
15. waitingStages += stage
16. }
17. }
18. } else {
19. abortStage(stage, "No active job forstage " + stage.id, None)
20. }
21. }
其中调用了getMissingParentStages,源码如下:
1. private def getMissingParentStages(stage:Stage): List[Stage] = {
2. val missing = new HashSet[Stage]
3. val visited = new HashSet[RDD[_]]
4. // We are manually maintaining a stack hereto prevent StackOverflowError
5. // caused by recursively visiting
6. val waitingForVisit = new Stack[RDD[_]]
7. def visit(rdd: RDD[_]) {
8. if (!visited(rdd)) {
9. visited += rdd
10. val rddHasUncachedPartitions =getCacheLocs(rdd).contains(Nil)
11. if (rddHasUncachedPartitions) {
12. for (dep <- rdd.dependencies) {
13. dep match {
14. case shufDep:ShuffleDependency[_, _, _] =>
15. val mapStage = getOrCreateShuffleMapStage(shufDep,stage.firstJobId)
16. if (!mapStage.isAvailable) {
17. missing += mapStage
18. }
19. case narrowDep:NarrowDependency[_] =>
20. waitingForVisit.push(narrowDep.rdd)
21. }
22. }
23. }
24. }
25. }
26. waitingForVisit.push(stage.rdd)
27. while (waitingForVisit.nonEmpty) {
28. visit(waitingForVisit.pop())
29. }
30. missing.toList
31. }
接下来我们结合Spark DAG划分Stage示意图进行详细阐述:
RDD A到RDD B之间,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此RDD A和RDD F分别是Stage1跟Stage3、Stage2跟Stage3的划分点。而RDD B到RDD G没有Shuffle,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage3;RDD C到RDD D、RDD F;RDD E到RDD F之间的数据不需要经过Shuffle,RDD F和RDD D加RDD E的依赖、RDD D和RDD C的依赖是窄依赖,因此,RDD C、RDD D、RDD E和RDD F划分到同一个Stage2。Stage1和Stage2是相互独立的,可以并发执行。而由于Stage3依赖Stage1和Stage2的计算结果,所以Stage3最后执行计算。
图 8- 1 DAG划分Stage示意图
![](http://img.blog.csdn.net/20170529112455296?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZHVhbl96aGlodWE=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
l createResultStage:基于作业ID(jobId),创建ResultStage。调用getOrCreateParentStages创建所有父stage, 返回parents: List[Stage]作为父 stage,将parents传入ResultStage,实例化生成ResultStage。
在DAG划分Stage示意图中:对RDD G 调用 createResultStage,通过getOrCreateParentStages 获取所有父List[Stage]:Stage1、Stage2,然后创建自己的 Stage3。
l getOrCreateParentStages:获取或创建给定RDD的父Stage列表。将根据提供的firstJobId创建新的Stages。
在DAG划分Stage示意图中:RDD G的getOrCreateParentStages会调用 getShuffleDependencies 获得RDD G 所有直接宽依赖集合 HashSet(ShuffleDependency(RDD F),ShuffleDependency(RDD A)) ,这里是RDD F和RDD A的宽依赖集合, 然后遍历集合,对(ShuffleDependency(RDD F),ShuffleDependency(RDD A))分别调用 getOrCreateShuffleMapStage。
l 对ShuffleDependency(RDD A) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage, RDD A已无父Stage,因此创建Stage1。
l 对ShuffleDependency(RDD F) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage, RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之间都没有Shuffle,没有宽依赖就不会产生Stage。因此RDD F已无父Stage,创建Stage2。
l 最后,把List(Stage1,Stage2) 作为Stage3 的父stages,创建Stage3,Stage3是 ResultStage。
回到DAGScheduler.scala的handleJobSubmitted方法,首先通过createResultStage构建了finalStage。
handleJobSubmitted源码如下:
1. private[scheduler]def handleJobSubmitted(jobId: Int,
2. …….
3. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4. …….
5. val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
6. …….
7. logInfo("Missing parents: " +getMissingParentStages(finalStage))
8. ……
9. submitStage(finalStage)
10. }
handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前Job的一些信息:
1. private[spark] class ActiveJob(
2. val jobId: Int,
3. val finalStage: Stage,
4. val callSite: CallSite,
5. val listener: JobListener,
6. val properties: Properties) {
handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissingParentStages根据finalStage找父Stage ,如果有父Stage就直接返回;如果没有父Stage,就进行创建。
handleJobSubmitted方法中submitStage比较重要, submitStage源码如下:
1. private def submitStage(stage: Stage) {
2. val jobId = activeJobForStage(stage)
3. if (jobId.isDefined) {
4. logDebug("submitStage(" + stage+ ")")
5. if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6. val missing =getMissingParentStages(stage).sortBy(_.id)
7. logDebug("missing: " + missing)
8. if (missing.isEmpty) {
9. logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10. submitMissingTasks(stage, jobId.get)
11. } else {
12. for (parent <- missing) {
13. submitStage(parent)
14. }
15. waitingStages += stage
16. }
17. }
18. } else {
19. abortStage(stage, "No active job forstage " + stage.id, None)
20. }
21. }
submitStage首先从activeJobForStage中获得JobID;如果jobId已经定义isDefined,那就获得即将计算的Stage(getMissingParentStages),然后进行升序排序。如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。 如果父Stage不为空,将循环递归调用 submitStage(parent),从后往前回溯。后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。submitStage一直循环调用,导致的结果是父Stage的父Stage .....一直回溯到最左侧的父Stage开始计算。
Task任务本地性算法实现:
接下来我们看一下submitMissingTasks的源码,关注Stage本身的算法以及任务本地性。
runningStages中将当前的stage加入进来,然后stage进行判断2种情况:ShuffleMapStage、ResultStage。
1. privatedef submitMissingTasks(stage: Stage, jobId: Int) {
2. ……
3. runningStages += stage
4. ……
5. stage match {
6. case s: ShuffleMapStage =>
7. outputCommitCoordinator.stageStart(stage= s.id, maxPartitionId = s.numPartitions - 1)
8. case s: ResultStage =>
9. outputCommitCoordinator.stageStart(
10. stage = s.id, maxPartitionId =s.rdd.partitions.length - 1)
11. }
12.
在submitMissingTasks中会通过调用以下代码来获得任务的本地性:
1. valtaskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
2. stage match {
3. case s: ShuffleMapStage =>
4. partitionsToCompute.map { id =>(id, getPreferredLocs(stage.rdd, id))}.toMap
5. case s: ResultStage =>
6. partitionsToCompute.map { id =>
7. val p = s.partitions(id)
8. (id, getPreferredLocs(stage.rdd,p))
9. }.toMap
10. }
partitionsToCompute获得的要计算的Partitions的id。
1. val partitionsToCompute: Seq[Int] =stage.findMissingPartitions()
如果stage是ShuffleMapStage,在代码 partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd, id))}.toMap中,这里的id是partitions的id,使用匿名函数生成一个Tuple,第一个元素值是数据分片的id,第二个元素是把rdd和id传进去,获取位置getPreferredLocs。然后通过toMap转换,返回的是 Map[Int, Seq[TaskLocation]]。第一个值是partitions的id,第二个值是 TaskLocation 。
具体一个Partition中的数据本地性的算法实现为下述getPreferredLocs代码中:
1. private[spark]
2. def getPreferredLocs(rdd: RDD[_], partition:Int): Seq[TaskLocation] = {
3. getPreferredLocsInternal(rdd, partition,new HashSet)
4. }
getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,它只能被DAGScheduler 通过线程安全方法getCacheLocs()使用。
getPreferredLocsInternal的源码如下:
1. privatedef getPreferredLocsInternal(
2. rdd: RDD[_],
3. partition: Int,
4. visited: HashSet[(RDD[_], Int)]):Seq[TaskLocation] = {
5. // Ifthe partition has already been visited, no need to re-visit.
6. // This avoids exponential pathexploration. SPARK-695
7. if (!visited.add((rdd, partition))) {
8. // Nil has already been returned forpreviously visited partitions.
9. return Nil
10. }
11. // If the partition is cached, return thecache locations
12. val cached = getCacheLocs(rdd)(partition)
13. if (cached.nonEmpty) {
14. return cached
15. }
16. // If the RDD has some placementpreferences (as is the case for input RDDs), get those
17. val rddPrefs =rdd.preferredLocations(rdd.partitions(partition)).toList
18. if (rddPrefs.nonEmpty) {
19. return rddPrefs.map(TaskLocation(_))
20. }
21.
22. // If the RDD has narrow dependencies, pickthe first partition of the first narrow dependency
23. // that has any placement preferences.Ideally we would choose based on transfer sizes,
24. // but this will do for now.
25. rdd.dependencies.foreach {
26. case n: NarrowDependency[_] =>
27. for (inPart <-n.getParents(partition)) {
28. val locs =getPreferredLocsInternal(n.rdd, inPart, visited)
29. if (locs != Nil) {
30. return locs
31. }
32. }
33.
34. case _ =>
35. }
36.
37. Nil
38. }
getPreferredLocsInternal代码中:
在visited中把当前的RDD和partition加进去是否能成功,visited是一个HashSet,如果已经有就出错。
如果partition被缓存,partition被缓存是指数据已经在DAGScheduler中。则在getCacheLocs(rdd)(partition)传进入rdd和partition,获取缓存的位置信息。如果获取到缓存位置信息就返回。
getCacheLocs的源码如下:
1. private[scheduler]
2. def getCacheLocs(rdd: RDD[_]):IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
3. // Note: this doesn't use `getOrElse()`because this method is called O(num tasks) times
4. if (!cacheLocs.contains(rdd.id)) {
5. // Note: if the storage level is NONE, wedon't need to get locations from block manager.
6. val locs: IndexedSeq[Seq[TaskLocation]] =if (rdd.getStorageLevel == StorageLevel.NONE) {
7. IndexedSeq.fill(rdd.partitions.length)(Nil)
8. } else {
9. val blockIds =
10. rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
11. blockManagerMaster.getLocations(blockIds).map{ bms =>
12. bms.map(bm =>TaskLocation(bm.host, bm.executorId))
13. }
14. }
15. cacheLocs(rdd.id) = locs
16. }
17. cacheLocs(rdd.id)
18. }
getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,Value是由分区编号索引的数组。每个数组值是RDD分区缓存位置的集合。
1. privateval cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]
getPreferredLocsInternal方法中在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations
如果自定义RDD,那一定要写getPreferedLocations,这是RDD的五大特征之一。例如想让Spark运行在HBase上或者一种现在还没有直接支持的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations。数据不动代码动,以HBase为例,Spark要操作HBase的数据,要求Spark运行在HBase所在的集群中,HBase是高速数据检索的引擎,数据在哪里,那Spark也需要运行在哪里。Spark能支持各种来源的数据,核心就在于getPreferedLocations。如果不实现getPreferedLocations,那就要从数据库中或HBase中将数据抓过来,速度会很慢。
RDD.scala的getPreferedLocations的源码如下:
1. finaldef preferredLocations(split: Partition): Seq[String] = {
2. checkpointRDD.map(_.getPreferredLocations(split)).getOrElse{
3. getPreferredLocations(split)
4. }
5. }
这个是RDD的getPreferredLocations
1. protected defgetPreferredLocations(split: Partition): Seq[String] = Nil
这样数据本地性在运行之前就已经完成,因为RDD构建的时候已经有元数据的信息。说明:本节代码基于Spark 2.1的源码版本。
DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。
我的努力求学没有得到别的好处,只不过是愈来愈发觉自己的无知。
——笛卡儿
学到很多东西的诀窍,就是一下子不要学很多。
——洛克
Spark作业调度的时候,Job提交过程中Stage 划分的算法以及Task最佳位置的算法。Stage的划分是DAGScheduler工作的核心,涉及作业在集群中怎么运行,Task最佳位置数据本地性的内容。Spark 算子的构建是链式的,涉及到怎么进行计算,首先是划分Stage,Stage划分以后才是计算的本身;分布式大数据系统追求最大化的数据本地性,数据本地性是指数据进行计算的时候,数据就在内存中,甚至不用计算就直接获得结果。
Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
Stage划分就是根据宽依赖,什么时候产生宽依赖呢?例如reducByKey、groupByKey等等;
我们从RDD的collect()方法开始,collect算子是一个Action,会触发job的运行:
RDD.scala的collect方法源码,调用了runJob方法:
1. def collect(): Array[T] = withScope {
2. val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)
3. Array.concat(results: _*)
4. }
进入SparkContext.scala的runJob方法:
1. def runJob[T, U: ClassTag](rdd: RDD[T], func:Iterator[T] => U): Array[U] = {
2. runJob(rdd, func, 0 untilrdd.partitions.length)
3. }
继续重载runJob方法:
1. def runJob[T, U: ClassTag](
2. rdd: RDD[T],
3. func: Iterator[T] => U,
4. partitions: Seq[Int]): Array[U] = {
5. val cleanedFunc = clean(func)
6. runJob(rdd, (ctx: TaskContext, it:Iterator[T]) => cleanedFunc(it), partitions)
7. }
继续重载runJob方法:
1. defrunJob[T, U: ClassTag](
2. rdd: RDD[T],
3. processPartition: Iterator[T] => U,
4. resultHandler: (Int, U) => Unit)
5. {
6. val processFunc = (context: TaskContext,iter: Iterator[T]) => processPartition(iter)
7. runJob[T, U](rdd, processFunc, 0 untilrdd.partitions.length, resultHandler)
8. }
继续重载runJob方法:
1. def runJob[T, U: ClassTag](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. resultHandler: (Int, U) => Unit): Unit= {
6. if (stopped.get()) {
7. throw newIllegalStateException("SparkContext has been shutdown")
8. }
9. val callSite = getCallSite
10. val cleanedFunc = clean(func)
11. logInfo("Starting job: " +callSite.shortForm)
12. if(conf.getBoolean("spark.logLineage", false)) {
13. logInfo("RDD's recursivedependencies:\n" + rdd.toDebugString)
14. }
15. dagScheduler.runJob(rdd, cleanedFunc,partitions, callSite, resultHandler, localProperties.get)
16. progressBar.foreach(_.finishAll())
17. rdd.doCheckpoint()
18. }
进入DAGScheduler.scala的runJob方法:
1. def runJob[T, U](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. callSite: CallSite,
6. resultHandler: (Int, U) => Unit,
7. properties: Properties): Unit = {
8. val start = System.nanoTime
9. val waiter = submitJob(rdd, func,partitions, callSite, resultHandler, properties)
10. // Note: Do not call Await.ready(future)because that calls `scala.concurrent.blocking`,
11. // which causes concurrent SQL executionsto fail if a fork-join pool is used. Note that
12. // due to idiosyncrasies in Scala,`awaitPermission` is not actually used anywhere so it's
13. // safe to pass in null here. For moredetail, see SPARK-13747.
14. val awaitPermission =null.asInstanceOf[scala.concurrent.CanAwait]
15. waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
16. waiter.completionFuture.value.get match {
17. case scala.util.Success(_) =>
18. logInfo("Job %d finished: %s, took %fs".format
19. (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
20. case scala.util.Failure(exception) =>
21. logInfo("Job %d failed: %s, took%f s".format
22. (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
23. // SPARK-8644: Include user stack tracein exceptions coming from DAGScheduler.
24. val callerStackTrace =Thread.currentThread().getStackTrace.tail
25. exception.setStackTrace(exception.getStackTrace++ callerStackTrace)
26. throw exception
27. }
28. }
DAGScheduler runJob的时候就交给了submitJob,waiter等待作业调度的结果,作业成功或者失败打印相关的日志信息。进入DAGScheduler的 submitJob方法:
1. defsubmitJob[T, U](
2. rdd: RDD[T],
3. func: (TaskContext, Iterator[T]) => U,
4. partitions: Seq[Int],
5. callSite: CallSite,
6. resultHandler: (Int, U) => Unit,
7. properties: Properties): JobWaiter[U] = {
8. // Check to make sure we are not launchinga task on a partition that does not exist.
9. val maxPartitions = rdd.partitions.length
10. partitions.find(p => p >=maxPartitions || p < 0).foreach { p =>
11. throw new IllegalArgumentException(
12. "Attempting to access anon-existent partition: " + p + ". " +
13. "Total number of partitions:" + maxPartitions)
14. }
15.
16. val jobId = nextJobId.getAndIncrement()
17. if (partitions.size == 0) {
18. // Return immediately if the job isrunning 0 tasks
19. return new JobWaiter[U](this, jobId, 0,resultHandler)
20. }
21.
22. assert(partitions.size > 0)
23. val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) => _]
24. val waiter = new JobWaiter(this, jobId,partitions.size, resultHandler)
25. eventProcessLoop.post(JobSubmitted(
26. jobId, rdd, func2, partitions.toArray,callSite, waiter,
27. SerializationUtils.clone(properties)))
28. waiter
29. }
submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob方法关键的代码是eventProcessLoop.post(JobSubmitted的JobSubmitted,
JobSubmitted是一个case class,而不是一个caseobject,因为application中有很多的job,不同的job的JobSubmitted实例不一样,如果使用caseobject,case object展示的内容是一样的,就像全局唯一变量,而现在我们需要不同的实例,因此使用case class。JobSubmitted 的成员finalRDD是最后一个RDD。
由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个caseclass JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:
1. private[scheduler]case class JobSubmitted(
2. jobId: Int,
3. finalRDD: RDD[_],
4. func: (TaskContext, Iterator[_]) => _,
5. partitions: Array[Int],
6. callSite: CallSite,
7. listener: JobListener,
8. properties: Properties = null)
9. extends DAGSchedulerEvent
JobSubmitted是private[scheduler]级别的,用户不可直接调用它。JobSubmitted封装了 jobId、封装了最后一个finalRDD,封装了具体对RDD操作的函数func,封装了有哪些partitions要进行计算,也封装了作业监听器listener、状态等等内容。
DAGScheduler的 submitJob方法关键代码eventProcessLoop.post(JobSubmitted中,将JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一个线程中发一个消息。eventProcessLoop源码如下:
1. private[scheduler] val eventProcessLoop = newDAGSchedulerEventProcessLoop(this)
看一下DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop继承至EventLoop:
1. private[scheduler] classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
2. extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging{
EventLoop中开启了一个线程eventThread,线程设置成Daemon后台运行的方式; run方法里面调用了onReceive(event)方法。post方法就是往eventQueue.put事件队列中放入一个元素。EventLoop的源码如下:
1. private[spark] abstract classEventLoop[E](name: String) extends Logging {
2.
3. private val eventQueue: BlockingQueue[E] =new LinkedBlockingDeque[E]()
4.
5. private val stopped = newAtomicBoolean(false)
6.
7. private val eventThread = new Thread(name) {
8. setDaemon(true)
9.
10. override def run(): Unit = {
11. try {
12. while (!stopped.get) {
13. val event = eventQueue.take()
14. try {
15. onReceive(event)
16. } catch {
17. case NonFatal(e) =>
18. try {
19. onError(e)
20. } catch {
21. case NonFatal(e) =>logError("Unexpected error in " + name, e)
22. }
23. }
24. }
25. } catch {
26. case ie: InterruptedException => //exit even if eventQueue is not empty
27. case NonFatal(e) =>logError("Unexpected error in " + name, e)
28. }
29. }
30.
31. }
32.
33. def start(): Unit = {
34. if (stopped.get) {
35. throw new IllegalStateException(name +" has already been stopped")
36. }
37. // Call onStart before starting the eventthread to make sure it happens before onReceive
38. onStart()
39. eventThread.start()
40. }
41. ……
42. def post(event: E): Unit = {
43. eventQueue.put(event)
44. }
eventProcessLoop是DAGSchedulerEventProcessLoo实例, DAGSchedulerEventProcessLoop继承至EventLoop,具体实现onReceive方法,onReceive方法又调用doOnReceive方法。
doOnReceive收到消息开始处理:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
4.
5. case MapStageSubmitted(jobId, dependency,callSite, listener, properties) =>
6. dagScheduler.handleMapStageSubmitted(jobId,dependency, callSite, listener, properties)
7.
8. case StageCancelled(stageId) =>
9. dagScheduler.handleStageCancellation(stageId)
10.
11. case JobCancelled(jobId) =>
12. dagScheduler.handleJobCancellation(jobId)
13.
14. case JobGroupCancelled(groupId) =>
15. dagScheduler.handleJobGroupCancelled(groupId)
16.
17. case AllJobsCancelled =>
18. dagScheduler.doCancelAllJobs()
19.
20. case ExecutorAdded(execId, host) =>
21. dagScheduler.handleExecutorAdded(execId,host)
22.
23. case ExecutorLost(execId, reason) =>
24. val filesLost = reason match {
25. case SlaveLost(_, true) => true
26. case _ => false
27. }
28. dagScheduler.handleExecutorLost(execId,filesLost)
29.
30. case BeginEvent(task, taskInfo) =>
31. dagScheduler.handleBeginEvent(task,taskInfo)
32.
33. case GettingResultEvent(taskInfo) =>
34. dagScheduler.handleGetTaskResult(taskInfo)
35.
36. case completion: CompletionEvent =>
37. dagScheduler.handleTaskCompletion(completion)
38.
39. case TaskSetFailed(taskSet, reason,exception) =>
40. dagScheduler.handleTaskSetFailed(taskSet,reason, exception)
41.
42. case ResubmitFailedStages =>
43. dagScheduler.resubmitFailedStages()
44. }
总结一下:EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到队列中,在不断的循环,所以可以拿到这个消息,转过来回调方法onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。
关于线程的异步通信:为什么要新开辟一条线程?例如在DAGScheduler发送消息为何不直接调用doOnReceive,而需要一个消息循环器。 DAGScheduler这里自己给自己发消息,不管是自己发消息,还是别人发消息,都采用一条线程去处理的话,两者处理的逻辑是一致的,扩展性就非常好。使用消息循环器,就能统一处理所有的消息。保证处理的业务逻辑都是一致的。
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。
在doOnReceive中通过模式匹配的方式把执行路由到caseJobSubmitted,调用dagScheduler.handleJobSubmitted方法:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
DAGScheduler 的handleJobSubmitted源码如下:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2. finalRDD: RDD[_],
3. func: (TaskContext, Iterator[_]) => _,
4. partitions: Array[Int],
5. callSite: CallSite,
6. listener: JobListener,
7. properties: Properties) {
8. var finalStage: ResultStage = null
9. try {
10. // New stage creation may throw anexception if, for example, jobs are run on a
11. // HadoopRDD whose underlying HDFS fileshave been deleted.
12. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
13. } catch {
14. case e: Exception =>
15. logWarning("Creating new stagefailed due to exception - job: " + jobId, e)
16. listener.jobFailed(e)
17. return
18. }
19.
20. val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
21. clearCacheLocs()
22. logInfo("Got job %s (%s) with %doutput partitions".format(
23. job.jobId, callSite.shortForm,partitions.length))
24. logInfo("Final stage: " +finalStage + " (" + finalStage.name + ")")
25. logInfo("Parents of final stage:" + finalStage.parents)
26. logInfo("Missing parents: " +getMissingParentStages(finalStage))
27.
28. val jobSubmissionTime =clock.getTimeMillis()
29. jobIdToActiveJob(jobId) = job
30. activeJobs += job
31. finalStage.setActiveJob(job)
32. val stageIds =jobIdToStageIds(jobId).toArray
33. val stageInfos = stageIds.flatMap(id =>stageIdToStage.get(id).map(_.latestInfo))
34. listenerBus.post(
35. SparkListenerJobStart(job.jobId,jobSubmissionTime, stageInfos, properties))
36. submitStage(finalStage)
37. }
Stage开始:每次调用一个runJob就是产生一个job;finalStage是一个ResultStage,最后一个Stage是ResultStage,前面的Stage是ShuffleMapStage。
在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条。
通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func, 分区partitions, jobId, callSite等内容。创建过程中可能捕获的异常,例如在Hadoop上底层的hdfs文件被删除了或者被修改了就出现异常。
createResultStage的源码如下:
1. private def createResultStage(
2. rdd: RDD[_],
3. func: (TaskContext, Iterator[_]) => _,
4. partitions: Array[Int],
5. jobId: Int,
6. callSite: CallSite): ResultStage = {
7. val parents = getOrCreateParentStages(rdd,jobId)
8. val id = nextStageId.getAndIncrement()
9. val stage = new ResultStage(id, rdd, func,partitions, parents, jobId, callSite)
10. stageIdToStage(id) = stage
11. updateJobIdStageIdMaps(jobId, stage)
12. stage
13. }
createResultStage中,基于我们的作业ID,作业ID(jobId)是作为第三个参数传进来的,创建了ResultStage。
createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,新的Stages将提供 firstJobId创建。getOrCreateParentStages源码:
1. private def getOrCreateParentStages(rdd:RDD[_], firstJobId: Int): List[Stage] = {
2. getShuffleDependencies(rdd).map {shuffleDep =>
3. getOrCreateShuffleMapStage(shuffleDep,firstJobId)
4. }.toList
5. }
getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父亲节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。例如,如果C shuffle依赖于B,B shuffle依赖A:A <-- B <-- C。在RDD C中调用getShuffleDependencies函数,将只返回 B <-- C的依赖。 此功能可用作单元测试。getShuffleDependencies源码如下:
1. private[scheduler] defgetShuffleDependencies(
2. rdd: RDD[_]):HashSet[ShuffleDependency[_, _, _]] = {
3. val parents = newHashSet[ShuffleDependency[_, _, _]]
4. val visited = new HashSet[RDD[_]]
5. val waitingForVisit = new Stack[RDD[_]]
6. waitingForVisit.push(rdd)
7. while (waitingForVisit.nonEmpty) {
8. val toVisit = waitingForVisit.pop()
9. if (!visited(toVisit)) {
10. visited += toVisit
11. toVisit.dependencies.foreach {
12. case shuffleDep: ShuffleDependency[_,_, _] =>
13. parents += shuffleDep
14. case dependency =>
15. waitingForVisit.push(dependency.rdd)
16. }
17. }
18. }
19. parents
20. }
getOrCreateParentStages方法中通过 getShuffleDependencies(rdd).map进行map转换时候调用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那就获取一个shuffle map stage,否则,如果shuffle map stage还不存在,除了即将进行计算的更远祖先节点的shuffle map stage,将创建一个自己的shuffle map stage。
getOrCreateShuffleMapStage源码如下:
1. privatedef getOrCreateShuffleMapStage(
2. shuffleDep: ShuffleDependency[_, _, _],
3. firstJobId: Int): ShuffleMapStage = {
4. shuffleIdToMapStage.get(shuffleDep.shuffleId)match {
5. case Some(stage) =>
6. stage
7.
8. case None =>
9. // Create stages for all missingancestor shuffle dependencies.
10. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach{ dep =>
11. // Even thoughgetMissingAncestorShuffleDependencies only returns shuffle dependencies
12. // that were not already inshuffleIdToMapStage, it's possible that by the time we
13. // get to a particular dependency inthe foreach loop, it's been added to
14. // shuffleIdToMapStage by the stagecreation process for an earlier dependency. See
15. // SPARK-13902 for more information.
16. if(!shuffleIdToMapStage.contains(dep.shuffleId)) {
17. createShuffleMapStage(dep,firstJobId)
18. }
19. }
20. // Finally, create a stage for thegiven shuffle dependency.
21. createShuffleMapStage(shuffleDep,firstJobId)
22. }
23. }
getOrCreateShuffleMapStage方法中:
l 如果根据shuffleId模式匹配获取到Stage,就返回stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage, 数据数据shuffleIdToMapStage:从shuffle dependency ID到ShuffleMapStage的映射关系,将生成的依赖关系的数据映射。只包含当前运行作业的映射数据,当shuffle stage作业完成时, shuffle 映射数据将被删除,唯一的记录 shuffle的数据将记录在MapOutputTracker中。
l 如果根据shuffleId模式匹配没有获取到Stage,调用getMissingAncestorShuffleDependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的stages。
getMissingAncestorShuffleDependencies查找shuffle依赖中还没有进行shuffleToMapStage 注册的祖先节点。getMissingAncestorShuffleDependencies源码如下:
1. private defgetMissingAncestorShuffleDependencies(
2. rdd: RDD[_]): Stack[ShuffleDependency[_,_, _]] = {
3. val ancestors = newStack[ShuffleDependency[_, _, _]]
4. val visited = new HashSet[RDD[_]]
5. // We are manually maintaining a stack hereto prevent StackOverflowError
6. // caused by recursively visiting
7. val waitingForVisit = new Stack[RDD[_]]
8. waitingForVisit.push(rdd)
9. while (waitingForVisit.nonEmpty) {
10. val toVisit = waitingForVisit.pop()
11. if (!visited(toVisit)) {
12. visited += toVisit
13. getShuffleDependencies(toVisit).foreach{ shuffleDep =>
14. if(!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
15. ancestors.push(shuffleDep)
16. waitingForVisit.push(shuffleDep.rdd)
17. } // Otherwise, the dependency andits ancestors have already been registered.
18. }
19. }
20. }
21. ancestors
22. }
createShuffleMapStage创建一个ShuffleMapStage,根据shuffle 依赖的分区。如果一个以前运行Stage产生相同的 shuffle 数据,此函数将从以前的shuffle的数据中赋值输出的位置信息,避免不必要的重新生成数据。
createShuffleMapStage源码如下:
1. def createShuffleMapStage(shuffleDep:ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
2. val rdd = shuffleDep.rdd
3. val numTasks = rdd.partitions.length
4. val parents = getOrCreateParentStages(rdd,jobId)
5. val id = nextStageId.getAndIncrement()
6. val stage = new ShuffleMapStage(id, rdd,numTasks, parents, jobId, rdd.creationSite, shuffleDep)
7.
8. stageIdToStage(id) = stage
9. shuffleIdToMapStage(shuffleDep.shuffleId) =stage
10. updateJobIdStageIdMaps(jobId, stage)
11.
12. if(mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
13. // A previously run stage generatedpartitions for this shuffle, so for each output
14. // that's still available, copyinformation about that output location to the new stage
15. // (so we don't unnecessarily re-computethat data).
16. val serLocs =mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
17. val locs =MapOutputTracker.deserializeMapStatuses(serLocs)
18. (0 until locs.length).foreach { i =>
19. if (locs(i) ne null) {
20. // locs(i) will be null if missing
21. stage.addOutputLoc(i, locs(i))
22. }
23. }
24. } else {
25. // Kind of ugly: need to register RDDswith the cache and map output tracker here
26. // since we can't do it in the RDDconstructor because # of partitions is unknown
27. logInfo("Registering RDD " +rdd.id + " (" + rdd.getCreationSite + ")")
28. mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.length)
29. }
30. stage
31. }
回到handleJobSubmitted,创建finalStage以后将提交finalStage:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2. ......
3. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4. ......
5. submitStage(finalStage)
6. }
submitStage提交 stage,, 首先递归提交即将计算的父stage。submitStage的源码如下:
1. privatedef submitStage(stage: Stage) {
2. val jobId = activeJobForStage(stage)
3. if (jobId.isDefined) {
4. logDebug("submitStage(" + stage+ ")")
5. if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6. val missing =getMissingParentStages(stage).sortBy(_.id)
7. logDebug("missing: " +missing)
8. if (missing.isEmpty) {
9. logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10. submitMissingTasks(stage, jobId.get)
11. } else {
12. for (parent <- missing) {
13. submitStage(parent)
14. }
15. waitingStages += stage
16. }
17. }
18. } else {
19. abortStage(stage, "No active job forstage " + stage.id, None)
20. }
21. }
其中调用了getMissingParentStages,源码如下:
1. private def getMissingParentStages(stage:Stage): List[Stage] = {
2. val missing = new HashSet[Stage]
3. val visited = new HashSet[RDD[_]]
4. // We are manually maintaining a stack hereto prevent StackOverflowError
5. // caused by recursively visiting
6. val waitingForVisit = new Stack[RDD[_]]
7. def visit(rdd: RDD[_]) {
8. if (!visited(rdd)) {
9. visited += rdd
10. val rddHasUncachedPartitions =getCacheLocs(rdd).contains(Nil)
11. if (rddHasUncachedPartitions) {
12. for (dep <- rdd.dependencies) {
13. dep match {
14. case shufDep:ShuffleDependency[_, _, _] =>
15. val mapStage = getOrCreateShuffleMapStage(shufDep,stage.firstJobId)
16. if (!mapStage.isAvailable) {
17. missing += mapStage
18. }
19. case narrowDep:NarrowDependency[_] =>
20. waitingForVisit.push(narrowDep.rdd)
21. }
22. }
23. }
24. }
25. }
26. waitingForVisit.push(stage.rdd)
27. while (waitingForVisit.nonEmpty) {
28. visit(waitingForVisit.pop())
29. }
30. missing.toList
31. }
接下来我们结合Spark DAG划分Stage示意图进行详细阐述:
RDD A到RDD B之间,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此RDD A和RDD F分别是Stage1跟Stage3、Stage2跟Stage3的划分点。而RDD B到RDD G没有Shuffle,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage3;RDD C到RDD D、RDD F;RDD E到RDD F之间的数据不需要经过Shuffle,RDD F和RDD D加RDD E的依赖、RDD D和RDD C的依赖是窄依赖,因此,RDD C、RDD D、RDD E和RDD F划分到同一个Stage2。Stage1和Stage2是相互独立的,可以并发执行。而由于Stage3依赖Stage1和Stage2的计算结果,所以Stage3最后执行计算。
图 8- 1 DAG划分Stage示意图
l createResultStage:基于作业ID(jobId),创建ResultStage。调用getOrCreateParentStages创建所有父stage, 返回parents: List[Stage]作为父 stage,将parents传入ResultStage,实例化生成ResultStage。
在DAG划分Stage示意图中:对RDD G 调用 createResultStage,通过getOrCreateParentStages 获取所有父List[Stage]:Stage1、Stage2,然后创建自己的 Stage3。
l getOrCreateParentStages:获取或创建给定RDD的父Stage列表。将根据提供的firstJobId创建新的Stages。
在DAG划分Stage示意图中:RDD G的getOrCreateParentStages会调用 getShuffleDependencies 获得RDD G 所有直接宽依赖集合 HashSet(ShuffleDependency(RDD F),ShuffleDependency(RDD A)) ,这里是RDD F和RDD A的宽依赖集合, 然后遍历集合,对(ShuffleDependency(RDD F),ShuffleDependency(RDD A))分别调用 getOrCreateShuffleMapStage。
l 对ShuffleDependency(RDD A) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage, RDD A已无父Stage,因此创建Stage1。
l 对ShuffleDependency(RDD F) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage, RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之间都没有Shuffle,没有宽依赖就不会产生Stage。因此RDD F已无父Stage,创建Stage2。
l 最后,把List(Stage1,Stage2) 作为Stage3 的父stages,创建Stage3,Stage3是 ResultStage。
回到DAGScheduler.scala的handleJobSubmitted方法,首先通过createResultStage构建了finalStage。
handleJobSubmitted源码如下:
1. private[scheduler]def handleJobSubmitted(jobId: Int,
2. …….
3. finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4. …….
5. val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
6. …….
7. logInfo("Missing parents: " +getMissingParentStages(finalStage))
8. ……
9. submitStage(finalStage)
10. }
handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前Job的一些信息:
1. private[spark] class ActiveJob(
2. val jobId: Int,
3. val finalStage: Stage,
4. val callSite: CallSite,
5. val listener: JobListener,
6. val properties: Properties) {
handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissingParentStages根据finalStage找父Stage ,如果有父Stage就直接返回;如果没有父Stage,就进行创建。
handleJobSubmitted方法中submitStage比较重要, submitStage源码如下:
1. private def submitStage(stage: Stage) {
2. val jobId = activeJobForStage(stage)
3. if (jobId.isDefined) {
4. logDebug("submitStage(" + stage+ ")")
5. if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6. val missing =getMissingParentStages(stage).sortBy(_.id)
7. logDebug("missing: " + missing)
8. if (missing.isEmpty) {
9. logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10. submitMissingTasks(stage, jobId.get)
11. } else {
12. for (parent <- missing) {
13. submitStage(parent)
14. }
15. waitingStages += stage
16. }
17. }
18. } else {
19. abortStage(stage, "No active job forstage " + stage.id, None)
20. }
21. }
submitStage首先从activeJobForStage中获得JobID;如果jobId已经定义isDefined,那就获得即将计算的Stage(getMissingParentStages),然后进行升序排序。如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。 如果父Stage不为空,将循环递归调用 submitStage(parent),从后往前回溯。后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。submitStage一直循环调用,导致的结果是父Stage的父Stage .....一直回溯到最左侧的父Stage开始计算。
Task任务本地性算法实现:
接下来我们看一下submitMissingTasks的源码,关注Stage本身的算法以及任务本地性。
runningStages中将当前的stage加入进来,然后stage进行判断2种情况:ShuffleMapStage、ResultStage。
1. privatedef submitMissingTasks(stage: Stage, jobId: Int) {
2. ……
3. runningStages += stage
4. ……
5. stage match {
6. case s: ShuffleMapStage =>
7. outputCommitCoordinator.stageStart(stage= s.id, maxPartitionId = s.numPartitions - 1)
8. case s: ResultStage =>
9. outputCommitCoordinator.stageStart(
10. stage = s.id, maxPartitionId =s.rdd.partitions.length - 1)
11. }
12.
在submitMissingTasks中会通过调用以下代码来获得任务的本地性:
1. valtaskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
2. stage match {
3. case s: ShuffleMapStage =>
4. partitionsToCompute.map { id =>(id, getPreferredLocs(stage.rdd, id))}.toMap
5. case s: ResultStage =>
6. partitionsToCompute.map { id =>
7. val p = s.partitions(id)
8. (id, getPreferredLocs(stage.rdd,p))
9. }.toMap
10. }
partitionsToCompute获得的要计算的Partitions的id。
1. val partitionsToCompute: Seq[Int] =stage.findMissingPartitions()
如果stage是ShuffleMapStage,在代码 partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd, id))}.toMap中,这里的id是partitions的id,使用匿名函数生成一个Tuple,第一个元素值是数据分片的id,第二个元素是把rdd和id传进去,获取位置getPreferredLocs。然后通过toMap转换,返回的是 Map[Int, Seq[TaskLocation]]。第一个值是partitions的id,第二个值是 TaskLocation 。
具体一个Partition中的数据本地性的算法实现为下述getPreferredLocs代码中:
1. private[spark]
2. def getPreferredLocs(rdd: RDD[_], partition:Int): Seq[TaskLocation] = {
3. getPreferredLocsInternal(rdd, partition,new HashSet)
4. }
getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,它只能被DAGScheduler 通过线程安全方法getCacheLocs()使用。
getPreferredLocsInternal的源码如下:
1. privatedef getPreferredLocsInternal(
2. rdd: RDD[_],
3. partition: Int,
4. visited: HashSet[(RDD[_], Int)]):Seq[TaskLocation] = {
5. // Ifthe partition has already been visited, no need to re-visit.
6. // This avoids exponential pathexploration. SPARK-695
7. if (!visited.add((rdd, partition))) {
8. // Nil has already been returned forpreviously visited partitions.
9. return Nil
10. }
11. // If the partition is cached, return thecache locations
12. val cached = getCacheLocs(rdd)(partition)
13. if (cached.nonEmpty) {
14. return cached
15. }
16. // If the RDD has some placementpreferences (as is the case for input RDDs), get those
17. val rddPrefs =rdd.preferredLocations(rdd.partitions(partition)).toList
18. if (rddPrefs.nonEmpty) {
19. return rddPrefs.map(TaskLocation(_))
20. }
21.
22. // If the RDD has narrow dependencies, pickthe first partition of the first narrow dependency
23. // that has any placement preferences.Ideally we would choose based on transfer sizes,
24. // but this will do for now.
25. rdd.dependencies.foreach {
26. case n: NarrowDependency[_] =>
27. for (inPart <-n.getParents(partition)) {
28. val locs =getPreferredLocsInternal(n.rdd, inPart, visited)
29. if (locs != Nil) {
30. return locs
31. }
32. }
33.
34. case _ =>
35. }
36.
37. Nil
38. }
getPreferredLocsInternal代码中:
在visited中把当前的RDD和partition加进去是否能成功,visited是一个HashSet,如果已经有就出错。
如果partition被缓存,partition被缓存是指数据已经在DAGScheduler中。则在getCacheLocs(rdd)(partition)传进入rdd和partition,获取缓存的位置信息。如果获取到缓存位置信息就返回。
getCacheLocs的源码如下:
1. private[scheduler]
2. def getCacheLocs(rdd: RDD[_]):IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
3. // Note: this doesn't use `getOrElse()`because this method is called O(num tasks) times
4. if (!cacheLocs.contains(rdd.id)) {
5. // Note: if the storage level is NONE, wedon't need to get locations from block manager.
6. val locs: IndexedSeq[Seq[TaskLocation]] =if (rdd.getStorageLevel == StorageLevel.NONE) {
7. IndexedSeq.fill(rdd.partitions.length)(Nil)
8. } else {
9. val blockIds =
10. rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
11. blockManagerMaster.getLocations(blockIds).map{ bms =>
12. bms.map(bm =>TaskLocation(bm.host, bm.executorId))
13. }
14. }
15. cacheLocs(rdd.id) = locs
16. }
17. cacheLocs(rdd.id)
18. }
getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,Value是由分区编号索引的数组。每个数组值是RDD分区缓存位置的集合。
1. privateval cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]
getPreferredLocsInternal方法中在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations
如果自定义RDD,那一定要写getPreferedLocations,这是RDD的五大特征之一。例如想让Spark运行在HBase上或者一种现在还没有直接支持的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations。数据不动代码动,以HBase为例,Spark要操作HBase的数据,要求Spark运行在HBase所在的集群中,HBase是高速数据检索的引擎,数据在哪里,那Spark也需要运行在哪里。Spark能支持各种来源的数据,核心就在于getPreferedLocations。如果不实现getPreferedLocations,那就要从数据库中或HBase中将数据抓过来,速度会很慢。
RDD.scala的getPreferedLocations的源码如下:
1. finaldef preferredLocations(split: Partition): Seq[String] = {
2. checkpointRDD.map(_.getPreferredLocations(split)).getOrElse{
3. getPreferredLocations(split)
4. }
5. }
这个是RDD的getPreferredLocations
1. protected defgetPreferredLocations(split: Partition): Seq[String] = Nil
这样数据本地性在运行之前就已经完成,因为RDD构建的时候已经有元数据的信息。说明:本节代码基于Spark 2.1的源码版本。
DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。
我的努力求学没有得到别的好处,只不过是愈来愈发觉自己的无知。
——笛卡儿
学到很多东西的诀窍,就是一下子不要学很多。
——洛克
相关文章推荐
- [Spark内核] 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 第34课:Stage划分和Task最佳位置算法源码彻底解密
- 大数据IMF传奇行动绝密课程第34课:Stage划分和Task最佳位置算法解密
- Stage划分和Task最佳位置算法解密
- stage划分和task最佳位置算法
- Spark 源码解析:彻底理解TaskScheduler的任务提交和task最佳位置算法
- Spark的Stage划分和task最佳位置算法
- 第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]
- 7.DAGScheduler的stage算法划分和TaskScheduler的task算法划分
- 《Spark商业案例与性能调优实战100课》第34课:彻底解密Spark 2.1.X中Shuffle 中SortShuffleWriter排序源码内幕解密
- Spark中job、stage、task的划分+源码执行过程分析
- 庖丁解牛—winpcap源码彻底解密续 (12)
- 庖丁解牛-----Live555源码彻底解密(RTP解包)
- 庖丁解牛-----Live555源码彻底解密(根据testOnDemandRTSPServer例子讲解)
- Spark技术内幕:Stage划分及提交源码分析
- 庖丁解牛-----winpcap源码彻底解密(一)
- 庖丁解牛-----Live555源码彻底解密(v0.78--2013.09.18)
- 庖丁解牛-----winpcap源码彻底解密(一)