您的位置:首页 > 其它

第34课: Stage划分和Task最佳位置算法源码彻底解密

2017-05-29 07:51 417 查看
第34课:  Stage划分和Task最佳位置算法源码彻底解密

Spark作业调度的时候,Job提交过程中Stage 划分的算法以及Task最佳位置的算法。Stage的划分是DAGScheduler工作的核心,涉及作业在集群中怎么运行,Task最佳位置数据本地性的内容。Spark 算子的构建是链式的,涉及到怎么进行计算,首先是划分Stage,Stage划分以后才是计算的本身;分布式大数据系统追求最大化的数据本地性,数据本地性是指数据进行计算的时候,数据就在内存中,甚至不用计算就直接获得结果。
Spark Application中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行;
Stage划分就是根据宽依赖,什么时候产生宽依赖呢?例如reducByKey、groupByKey等等;
我们从RDD的collect()方法开始,collect算子是一个Action,会触发job的运行:
RDD.scala的collect方法源码,调用了runJob方法:
1.           def collect(): Array[T] = withScope {
2.             val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)
3.             Array.concat(results: _*)
4.           }
进入SparkContext.scala的runJob方法:
1.           def runJob[T, U: ClassTag](rdd: RDD[T], func:Iterator[T] => U): Array[U] = {
2.             runJob(rdd, func, 0 untilrdd.partitions.length)
3.           }
继续重载runJob方法:
1.           def runJob[T, U: ClassTag](
2.               rdd: RDD[T],
3.               func: Iterator[T] => U,
4.               partitions: Seq[Int]): Array[U] = {
5.             val cleanedFunc = clean(func)
6.             runJob(rdd, (ctx: TaskContext, it:Iterator[T]) => cleanedFunc(it), partitions)
7.           }
 
继续重载runJob方法:
1.              defrunJob[T, U: ClassTag](
2.               rdd: RDD[T],
3.               processPartition: Iterator[T] => U,
4.               resultHandler: (Int, U) => Unit)
5.           {
6.             val processFunc = (context: TaskContext,iter: Iterator[T]) => processPartition(iter)
7.             runJob[T, U](rdd, processFunc, 0 untilrdd.partitions.length, resultHandler)
8.           }
 
继续重载runJob方法:
1.               def runJob[T, U: ClassTag](
2.               rdd: RDD[T],
3.               func: (TaskContext, Iterator[T]) => U,
4.               partitions: Seq[Int],
5.               resultHandler: (Int, U) => Unit): Unit= {
6.             if (stopped.get()) {
7.               throw newIllegalStateException("SparkContext has been shutdown")
8.             }
9.             val callSite = getCallSite
10.          val cleanedFunc = clean(func)
11.          logInfo("Starting job: " +callSite.shortForm)
12.          if(conf.getBoolean("spark.logLineage", false)) {
13.            logInfo("RDD's recursivedependencies:\n" + rdd.toDebugString)
14.          }
15.          dagScheduler.runJob(rdd, cleanedFunc,partitions, callSite, resultHandler, localProperties.get)
16.          progressBar.foreach(_.finishAll())
17.          rdd.doCheckpoint()
18.        }
 
进入DAGScheduler.scala的runJob方法:
1.          def runJob[T, U](
2.               rdd: RDD[T],
3.               func: (TaskContext, Iterator[T]) => U,
4.               partitions: Seq[Int],
5.               callSite: CallSite,
6.               resultHandler: (Int, U) => Unit,
7.               properties: Properties): Unit = {
8.             val start = System.nanoTime
9.             val waiter = submitJob(rdd, func,partitions, callSite, resultHandler, properties)
10.          // Note: Do not call Await.ready(future)because that calls `scala.concurrent.blocking`,
11.          // which causes concurrent SQL executionsto fail if a fork-join pool is used. Note that
12.          // due to idiosyncrasies in Scala,`awaitPermission` is not actually used anywhere so it's
13.          // safe to pass in null here. For moredetail, see SPARK-13747.
14.          val awaitPermission =null.asInstanceOf[scala.concurrent.CanAwait]
15.          waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
16.          waiter.completionFuture.value.get match {
17.            case scala.util.Success(_) =>
18.              logInfo("Job %d finished: %s, took %fs".format
19.                (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
20.            case scala.util.Failure(exception) =>
21.              logInfo("Job %d failed: %s, took%f s".format
22.                (waiter.jobId, callSite.shortForm,(System.nanoTime - start) / 1e9))
23.              // SPARK-8644: Include user stack tracein exceptions coming from DAGScheduler.
24.              val callerStackTrace =Thread.currentThread().getStackTrace.tail
25.              exception.setStackTrace(exception.getStackTrace++ callerStackTrace)
26.              throw exception
27.          }
28.        }
 
DAGScheduler runJob的时候就交给了submitJob,waiter等待作业调度的结果,作业成功或者失败打印相关的日志信息。进入DAGScheduler的 submitJob方法:
1.             defsubmitJob[T, U](
2.               rdd: RDD[T],
3.               func: (TaskContext, Iterator[T]) => U,
4.               partitions: Seq[Int],
5.               callSite: CallSite,
6.               resultHandler: (Int, U) => Unit,
7.               properties: Properties): JobWaiter[U] = {
8.             // Check to make sure we are not launchinga task on a partition that does not exist.
9.             val maxPartitions = rdd.partitions.length
10.          partitions.find(p => p >=maxPartitions || p < 0).foreach { p =>
11.            throw new IllegalArgumentException(
12.              "Attempting to access anon-existent partition: " + p + ". " +
13.                "Total number of partitions:" + maxPartitions)
14.          }
15.       
16.          val jobId = nextJobId.getAndIncrement()
17.          if (partitions.size == 0) {
18.            // Return immediately if the job isrunning 0 tasks
19.            return new JobWaiter[U](this, jobId, 0,resultHandler)
20.          }
21.       
22.          assert(partitions.size > 0)
23.          val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) => _]
24.          val waiter = new JobWaiter(this, jobId,partitions.size, resultHandler)
25.          eventProcessLoop.post(JobSubmitted(
26.            jobId, rdd, func2, partitions.toArray,callSite, waiter,
27.            SerializationUtils.clone(properties)))
28.          waiter
29.        }
 
submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob方法关键的代码是eventProcessLoop.post(JobSubmitted的JobSubmitted,
JobSubmitted是一个case class,而不是一个caseobject,因为application中有很多的job,不同的job的JobSubmitted实例不一样,如果使用caseobject,case object展示的内容是一样的,就像全局唯一变量,而现在我们需要不同的实例,因此使用case class。JobSubmitted 的成员finalRDD是最后一个RDD。
由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个caseclass JobSubmitted对象给eventProcessLoop,其中JobSubmitted源码如下:
1.            private[scheduler]case class JobSubmitted(
2.             jobId: Int,
3.             finalRDD: RDD[_],
4.             func: (TaskContext, Iterator[_]) => _,
5.             partitions: Array[Int],
6.             callSite: CallSite,
7.             listener: JobListener,
8.             properties: Properties = null)
9.           extends DAGSchedulerEvent
JobSubmitted是private[scheduler]级别的,用户不可直接调用它。JobSubmitted封装了 jobId、封装了最后一个finalRDD,封装了具体对RDD操作的函数func,封装了有哪些partitions要进行计算,也封装了作业监听器listener、状态等等内容。
DAGScheduler的 submitJob方法关键代码eventProcessLoop.post(JobSubmitted中,将JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一个线程中发一个消息。eventProcessLoop源码如下:
1.           private[scheduler] val eventProcessLoop = newDAGSchedulerEventProcessLoop(this)
看一下DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop继承至EventLoop:
1.            private[scheduler] classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
2.           extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging{
EventLoop中开启了一个线程eventThread,线程设置成Daemon后台运行的方式; run方法里面调用了onReceive(event)方法。post方法就是往eventQueue.put事件队列中放入一个元素。EventLoop的源码如下:
1.          private[spark] abstract classEventLoop[E](name: String) extends Logging {
2.          
3.           private val eventQueue: BlockingQueue[E] =new LinkedBlockingDeque[E]()
4.          
5.           private val stopped = newAtomicBoolean(false)
6.          
7.           private val eventThread = new Thread(name) {
8.             setDaemon(true)
9.          
10.          override def run(): Unit = {
11.            try {
12.              while (!stopped.get) {
13.                val event = eventQueue.take()
14.                try {
15.                  onReceive(event)
16.                } catch {
17.                  case NonFatal(e) =>
18.                    try {
19.                      onError(e)
20.                    } catch {
21.                      case NonFatal(e) =>logError("Unexpected error in " + name, e)
22.                    }
23.                }
24.              }
25.            } catch {
26.              case ie: InterruptedException => //exit even if eventQueue is not empty
27.              case NonFatal(e) =>logError("Unexpected error in " + name, e)
28.            }
29.          }
30.       
31.        }
32.       
33.        def start(): Unit = {
34.          if (stopped.get) {
35.            throw new IllegalStateException(name +" has already been stopped")
36.          }
37.          // Call onStart before starting the eventthread to make sure it happens before onReceive
38.          onStart()
39.          eventThread.start()
40.        }
41.      ……
42.      def post(event: E): Unit = {
43.          eventQueue.put(event)
44.        }
 
eventProcessLoop是DAGSchedulerEventProcessLoo实例, DAGSchedulerEventProcessLoop继承至EventLoop,具体实现onReceive方法,onReceive方法又调用doOnReceive方法。
doOnReceive收到消息开始处理:
1.           private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2.             case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3.               dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
4.          
5.             case MapStageSubmitted(jobId, dependency,callSite, listener, properties) =>
6.               dagScheduler.handleMapStageSubmitted(jobId,dependency, callSite, listener, properties)
7.          
8.             case StageCancelled(stageId) =>
9.               dagScheduler.handleStageCancellation(stageId)
10.       
11.          case JobCancelled(jobId) =>
12.            dagScheduler.handleJobCancellation(jobId)
13.       
14.          case JobGroupCancelled(groupId) =>
15.            dagScheduler.handleJobGroupCancelled(groupId)
16.       
17.          case AllJobsCancelled =>
18.            dagScheduler.doCancelAllJobs()
19.       
20.          case ExecutorAdded(execId, host) =>
21.            dagScheduler.handleExecutorAdded(execId,host)
22.       
23.          case ExecutorLost(execId, reason) =>
24.            val filesLost = reason match {
25.              case SlaveLost(_, true) => true
26.              case _ => false
27.            }
28.            dagScheduler.handleExecutorLost(execId,filesLost)
29.       
30.          case BeginEvent(task, taskInfo) =>
31.            dagScheduler.handleBeginEvent(task,taskInfo)
32.       
33.          case GettingResultEvent(taskInfo) =>
34.            dagScheduler.handleGetTaskResult(taskInfo)
35.       
36.          case completion: CompletionEvent =>
37.            dagScheduler.handleTaskCompletion(completion)
38.       
39.          case TaskSetFailed(taskSet, reason,exception) =>
40.            dagScheduler.handleTaskSetFailed(taskSet,reason, exception)
41.       
42.          case ResubmitFailedStages =>
43.            dagScheduler.resubmitFailedStages()
44.        }
 
总结一下:EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到队列中,在不断的循环,所以可以拿到这个消息,转过来回调方法onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。
关于线程的异步通信:为什么要新开辟一条线程?例如在DAGScheduler发送消息为何不直接调用doOnReceive,而需要一个消息循环器。 DAGScheduler这里自己给自己发消息,不管是自己发消息,还是别人发消息,都采用一条线程去处理的话,两者处理的逻辑是一致的,扩展性就非常好。使用消息循环器,就能统一处理所有的消息。保证处理的业务逻辑都是一致的。
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。
        在doOnReceive中通过模式匹配的方式把执行路由到caseJobSubmitted,调用dagScheduler.handleJobSubmitted方法:
1.          private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2.             case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3.               dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
 
DAGScheduler 的handleJobSubmitted源码如下:
1.            private[scheduler] defhandleJobSubmitted(jobId: Int,
2.               finalRDD: RDD[_],
3.               func: (TaskContext, Iterator[_]) => _,
4.               partitions: Array[Int],
5.               callSite: CallSite,
6.               listener: JobListener,
7.               properties: Properties) {
8.             var finalStage: ResultStage = null
9.             try {
10.            // New stage creation may throw anexception if, for example, jobs are run on a
11.            // HadoopRDD whose underlying HDFS fileshave been deleted.
12.            finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
13.          } catch {
14.            case e: Exception =>
15.              logWarning("Creating new stagefailed due to exception - job: " + jobId, e)
16.              listener.jobFailed(e)
17.              return
18.          }
19.       
20.          val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
21.          clearCacheLocs()
22.          logInfo("Got job %s (%s) with %doutput partitions".format(
23.            job.jobId, callSite.shortForm,partitions.length))
24.          logInfo("Final stage: " +finalStage + " (" + finalStage.name + ")")
25.          logInfo("Parents of final stage:" + finalStage.parents)
26.          logInfo("Missing parents: " +getMissingParentStages(finalStage))
27.       
28.          val jobSubmissionTime =clock.getTimeMillis()
29.          jobIdToActiveJob(jobId) = job
30.          activeJobs += job
31.          finalStage.setActiveJob(job)
32.          val stageIds =jobIdToStageIds(jobId).toArray
33.          val stageInfos = stageIds.flatMap(id =>stageIdToStage.get(id).map(_.latestInfo))
34.          listenerBus.post(
35.            SparkListenerJobStart(job.jobId,jobSubmissionTime, stageInfos, properties))
36.          submitStage(finalStage)
37.        }
 
Stage开始:每次调用一个runJob就是产生一个job;finalStage是一个ResultStage,最后一个Stage是ResultStage,前面的Stage是ShuffleMapStage。
在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条。
通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func, 分区partitions, jobId, callSite等内容。创建过程中可能捕获的异常,例如在Hadoop上底层的hdfs文件被删除了或者被修改了就出现异常。
createResultStage的源码如下:
1.          private def createResultStage(
2.               rdd: RDD[_],
3.               func: (TaskContext, Iterator[_]) => _,
4.               partitions: Array[Int],
5.               jobId: Int,
6.               callSite: CallSite): ResultStage = {
7.             val parents = getOrCreateParentStages(rdd,jobId)
8.             val id = nextStageId.getAndIncrement()
9.             val stage = new ResultStage(id, rdd, func,partitions, parents, jobId, callSite)
10.          stageIdToStage(id) = stage
11.          updateJobIdStageIdMaps(jobId, stage)
12.          stage
13.        }
 
createResultStage中,基于我们的作业ID,作业ID(jobId)是作为第三个参数传进来的,创建了ResultStage。
createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,新的Stages将提供 firstJobId创建。getOrCreateParentStages源码:
1.              private def getOrCreateParentStages(rdd:RDD[_], firstJobId: Int): List[Stage] = {
2.             getShuffleDependencies(rdd).map {shuffleDep =>
3.               getOrCreateShuffleMapStage(shuffleDep,firstJobId)
4.             }.toList
5.           }
 

getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父亲节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。例如,如果C shuffle依赖于B,B shuffle依赖A:A <-- B <-- C。在RDD C中调用getShuffleDependencies函数,将只返回 B <-- C的依赖。 此功能可用作单元测试。getShuffleDependencies源码如下:
1.               private[scheduler] defgetShuffleDependencies(
2.               rdd: RDD[_]):HashSet[ShuffleDependency[_, _, _]] = {
3.             val parents = newHashSet[ShuffleDependency[_, _, _]]
4.             val visited = new HashSet[RDD[_]]
5.             val waitingForVisit = new Stack[RDD[_]]
6.             waitingForVisit.push(rdd)
7.             while (waitingForVisit.nonEmpty) {
8.               val toVisit = waitingForVisit.pop()
9.               if (!visited(toVisit)) {
10.              visited += toVisit
11.              toVisit.dependencies.foreach {
12.                case shuffleDep: ShuffleDependency[_,_, _] =>
13.                  parents += shuffleDep
14.                case dependency =>
15.                  waitingForVisit.push(dependency.rdd)
16.              }
17.            }
18.          }
19.          parents
20.        }
 
getOrCreateParentStages方法中通过 getShuffleDependencies(rdd).map进行map转换时候调用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那就获取一个shuffle map stage,否则,如果shuffle map stage还不存在,除了即将进行计算的更远祖先节点的shuffle map stage,将创建一个自己的shuffle map stage。
getOrCreateShuffleMapStage源码如下:
1.           privatedef getOrCreateShuffleMapStage(
2.               shuffleDep: ShuffleDependency[_, _, _],
3.               firstJobId: Int): ShuffleMapStage = {
4.             shuffleIdToMapStage.get(shuffleDep.shuffleId)match {
5.               case Some(stage) =>
6.                 stage
7.          
8.               case None =>
9.                 // Create stages for all missingancestor shuffle dependencies.
10.              getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach{ dep =>
11.                // Even thoughgetMissingAncestorShuffleDependencies only returns shuffle dependencies
12.                // that were not already inshuffleIdToMapStage, it's possible that by the time we
13.                // get to a particular dependency inthe foreach loop, it's been added to
14.                // shuffleIdToMapStage by the stagecreation process for an earlier dependency. See
15.                // SPARK-13902 for more information.
16.                if(!shuffleIdToMapStage.contains(dep.shuffleId)) {
17.                  createShuffleMapStage(dep,firstJobId)
18.                }
19.              }
20.              // Finally, create a stage for thegiven shuffle dependency.
21.              createShuffleMapStage(shuffleDep,firstJobId)
22.          }
23.        }
 
getOrCreateShuffleMapStage方法中:
l  如果根据shuffleId模式匹配获取到Stage,就返回stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage, 数据数据shuffleIdToMapStage:从shuffle dependency ID到ShuffleMapStage的映射关系,将生成的依赖关系的数据映射。只包含当前运行作业的映射数据,当shuffle stage作业完成时, shuffle 映射数据将被删除,唯一的记录 shuffle的数据将记录在MapOutputTracker中。
l  如果根据shuffleId模式匹配没有获取到Stage,调用getMissingAncestorShuffleDependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的stages。
 
getMissingAncestorShuffleDependencies查找shuffle依赖中还没有进行shuffleToMapStage 注册的祖先节点。getMissingAncestorShuffleDependencies源码如下:
1.              private defgetMissingAncestorShuffleDependencies(
2.               rdd: RDD[_]): Stack[ShuffleDependency[_,_, _]] = {
3.             val ancestors = newStack[ShuffleDependency[_, _, _]]
4.             val visited = new HashSet[RDD[_]]
5.             // We are manually maintaining a stack hereto prevent StackOverflowError
6.             // caused by recursively visiting
7.             val waitingForVisit = new Stack[RDD[_]]
8.             waitingForVisit.push(rdd)
9.             while (waitingForVisit.nonEmpty) {
10.            val toVisit = waitingForVisit.pop()
11.            if (!visited(toVisit)) {
12.              visited += toVisit
13.              getShuffleDependencies(toVisit).foreach{ shuffleDep =>
14.                if(!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
15.                  ancestors.push(shuffleDep)
16.                  waitingForVisit.push(shuffleDep.rdd)
17.                } // Otherwise, the dependency andits ancestors have already been registered.
18.              }
19.            }
20.          }
21.          ancestors
22.        }
 
createShuffleMapStage创建一个ShuffleMapStage,根据shuffle 依赖的分区。如果一个以前运行Stage产生相同的 shuffle 数据,此函数将从以前的shuffle的数据中赋值输出的位置信息,避免不必要的重新生成数据。
createShuffleMapStage源码如下:
1.           def createShuffleMapStage(shuffleDep:ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
2.             val rdd = shuffleDep.rdd
3.             val numTasks = rdd.partitions.length
4.             val parents = getOrCreateParentStages(rdd,jobId)
5.             val id = nextStageId.getAndIncrement()
6.             val stage = new ShuffleMapStage(id, rdd,numTasks, parents, jobId, rdd.creationSite, shuffleDep)
7.          
8.             stageIdToStage(id) = stage
9.             shuffleIdToMapStage(shuffleDep.shuffleId) =stage
10.          updateJobIdStageIdMaps(jobId, stage)
11.       
12.          if(mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
13.            // A previously run stage generatedpartitions for this shuffle, so for each output
14.            // that's still available, copyinformation about that output location to the new stage
15.            // (so we don't unnecessarily re-computethat data).
16.            val serLocs =mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
17.            val locs =MapOutputTracker.deserializeMapStatuses(serLocs)
18.            (0 until locs.length).foreach { i =>
19.              if (locs(i) ne null) {
20.                // locs(i) will be null if missing
21.                stage.addOutputLoc(i, locs(i))
22.              }
23.            }
24.          } else {
25.            // Kind of ugly: need to register RDDswith the cache and map output tracker here
26.            // since we can't do it in the RDDconstructor because # of partitions is unknown
27.            logInfo("Registering RDD " +rdd.id + " (" + rdd.getCreationSite + ")")
28.            mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.length)
29.          }
30.          stage
31.        }
 
回到handleJobSubmitted,创建finalStage以后将提交finalStage:
1.            private[scheduler] defhandleJobSubmitted(jobId: Int,
2.         ......
3.           finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4.         ......
5.           submitStage(finalStage)
6.           }
 
submitStage提交 stage,, 首先递归提交即将计算的父stage。submitStage的源码如下:
1.           privatedef submitStage(stage: Stage) {
2.             val jobId = activeJobForStage(stage)
3.             if (jobId.isDefined) {
4.               logDebug("submitStage(" + stage+ ")")
5.               if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6.                 val missing =getMissingParentStages(stage).sortBy(_.id)
7.                 logDebug("missing: " +missing)
8.                 if (missing.isEmpty) {
9.                   logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10.                submitMissingTasks(stage, jobId.get)
11.              } else {
12.                for (parent <- missing) {
13.                  submitStage(parent)
14.                }
15.                waitingStages += stage
16.              }
17.            }
18.          } else {
19.            abortStage(stage, "No active job forstage " + stage.id, None)
20.          }
21.        }
其中调用了getMissingParentStages,源码如下:
1.             private def getMissingParentStages(stage:Stage): List[Stage] = {
2.             val missing = new HashSet[Stage]
3.             val visited = new HashSet[RDD[_]]
4.             // We are manually maintaining a stack hereto prevent StackOverflowError
5.             // caused by recursively visiting
6.             val waitingForVisit = new Stack[RDD[_]]
7.             def visit(rdd: RDD[_]) {
8.               if (!visited(rdd)) {
9.                 visited += rdd
10.              val rddHasUncachedPartitions =getCacheLocs(rdd).contains(Nil)
11.              if (rddHasUncachedPartitions) {
12.                for (dep <- rdd.dependencies) {
13.                  dep match {
14.                    case shufDep:ShuffleDependency[_, _, _] =>
15.                      val mapStage = getOrCreateShuffleMapStage(shufDep,stage.firstJobId)
16.                      if (!mapStage.isAvailable) {
17.                        missing += mapStage
18.                      }
19.                    case narrowDep:NarrowDependency[_] =>
20.                      waitingForVisit.push(narrowDep.rdd)
21.                  }
22.                }
23.              }
24.            }
25.          }
26.          waitingForVisit.push(stage.rdd)
27.          while (waitingForVisit.nonEmpty) {
28.            visit(waitingForVisit.pop())
29.          }
30.          missing.toList
31.        }
 
接下来我们结合Spark DAG划分Stage示意图进行详细阐述:
RDD A到RDD B之间,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此RDD A和RDD F分别是Stage1跟Stage3、Stage2跟Stage3的划分点。而RDD B到RDD G没有Shuffle,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage3;RDD C到RDD D、RDD F;RDD E到RDD F之间的数据不需要经过Shuffle,RDD F和RDD D加RDD E的依赖、RDD D和RDD C的依赖是窄依赖,因此,RDD C、RDD D、RDD E和RDD F划分到同一个Stage2。Stage1和Stage2是相互独立的,可以并发执行。而由于Stage3依赖Stage1和Stage2的计算结果,所以Stage3最后执行计算。
图 8- 1 DAG划分Stage示意图



l  createResultStage:基于作业ID(jobId),创建ResultStage。调用getOrCreateParentStages创建所有父stage, 返回parents: List[Stage]作为父 stage,将parents传入ResultStage,实例化生成ResultStage。
在DAG划分Stage示意图中:对RDD G 调用 createResultStage,通过getOrCreateParentStages 获取所有父List[Stage]:Stage1、Stage2,然后创建自己的 Stage3。
l  getOrCreateParentStages:获取或创建给定RDD的父Stage列表。将根据提供的firstJobId创建新的Stages。
在DAG划分Stage示意图中:RDD G的getOrCreateParentStages会调用 getShuffleDependencies 获得RDD G 所有直接宽依赖集合 HashSet(ShuffleDependency(RDD F),ShuffleDependency(RDD A)) ,这里是RDD F和RDD A的宽依赖集合, 然后遍历集合,对(ShuffleDependency(RDD F),ShuffleDependency(RDD A))分别调用 getOrCreateShuffleMapStage。
l  对ShuffleDependency(RDD A) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage, RDD A已无父Stage,因此创建Stage1。
l  对ShuffleDependency(RDD F) 调用 getOrCreateShuffleMapStage, getOrCreateShuffleMapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffleDependencies,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage, RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之间都没有Shuffle,没有宽依赖就不会产生Stage。因此RDD F已无父Stage,创建Stage2。
l  最后,把List(Stage1,Stage2) 作为Stage3 的父stages,创建Stage3,Stage3是 ResultStage。

 
   回到DAGScheduler.scala的handleJobSubmitted方法,首先通过createResultStage构建了finalStage。
handleJobSubmitted源码如下:
1.           private[scheduler]def handleJobSubmitted(jobId: Int,
2.             …….
3.               finalStage = createResultStage(finalRDD,func, partitions, jobId, callSite)
4.            …….
5.             val job = new ActiveJob(jobId, finalStage,callSite, listener, properties)
6.            …….
7.             logInfo("Missing parents: " +getMissingParentStages(finalStage))
8.             ……
9.             submitStage(finalStage)
10.        }
 
handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前Job的一些信息:
1.          private[spark] class ActiveJob(
2.             val jobId: Int,
3.             val finalStage: Stage,
4.             val callSite: CallSite,
5.             val listener: JobListener,
6.             val properties: Properties) {
 
handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissingParentStages根据finalStage找父Stage ,如果有父Stage就直接返回;如果没有父Stage,就进行创建。
         handleJobSubmitted方法中submitStage比较重要, submitStage源码如下:
1.          private def submitStage(stage: Stage) {
2.             val jobId = activeJobForStage(stage)
3.             if (jobId.isDefined) {
4.               logDebug("submitStage(" + stage+ ")")
5.               if (!waitingStages(stage) &&!runningStages(stage) && !failedStages(stage)) {
6.                 val missing =getMissingParentStages(stage).sortBy(_.id)
7.                 logDebug("missing: " + missing)
8.                 if (missing.isEmpty) {
9.                   logInfo("Submitting " +stage + " (" + stage.rdd + "), which has no missingparents")
10.                submitMissingTasks(stage, jobId.get)
11.              } else {
12.                for (parent <- missing) {
13.                  submitStage(parent)
14.                }
15.                waitingStages += stage
16.              }
17.            }
18.          } else {
19.            abortStage(stage, "No active job forstage " + stage.id, None)
20.          }
21.        }
 
 submitStage首先从activeJobForStage中获得JobID;如果jobId已经定义isDefined,那就获得即将计算的Stage(getMissingParentStages),然后进行升序排序。如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。 如果父Stage不为空,将循环递归调用 submitStage(parent),从后往前回溯。后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。submitStage一直循环调用,导致的结果是父Stage的父Stage .....一直回溯到最左侧的父Stage开始计算。
 
Task任务本地性算法实现:
接下来我们看一下submitMissingTasks的源码,关注Stage本身的算法以及任务本地性。
runningStages中将当前的stage加入进来,然后stage进行判断2种情况:ShuffleMapStage、ResultStage。
1.           privatedef submitMissingTasks(stage: Stage, jobId: Int) {
2.            ……
3.             runningStages += stage
4.            ……
5.             stage match {
6.               case s: ShuffleMapStage =>
7.                 outputCommitCoordinator.stageStart(stage= s.id, maxPartitionId = s.numPartitions - 1)
8.               case s: ResultStage =>
9.                 outputCommitCoordinator.stageStart(
10.                stage = s.id, maxPartitionId =s.rdd.partitions.length - 1)
11.          }
12.          
 
在submitMissingTasks中会通过调用以下代码来获得任务的本地性:
1.             valtaskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
2.               stage match {
3.                 case s: ShuffleMapStage =>
4.                   partitionsToCompute.map { id =>(id, getPreferredLocs(stage.rdd, id))}.toMap
5.                 case s: ResultStage =>
6.                   partitionsToCompute.map { id =>
7.                     val p = s.partitions(id)
8.                     (id, getPreferredLocs(stage.rdd,p))
9.                   }.toMap
10.            }
 
partitionsToCompute获得的要计算的Partitions的id。
1.             val partitionsToCompute: Seq[Int] =stage.findMissingPartitions()
 
如果stage是ShuffleMapStage,在代码 partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd, id))}.toMap中,这里的id是partitions的id,使用匿名函数生成一个Tuple,第一个元素值是数据分片的id,第二个元素是把rdd和id传进去,获取位置getPreferredLocs。然后通过toMap转换,返回的是 Map[Int, Seq[TaskLocation]]。第一个值是partitions的id,第二个值是 TaskLocation 。
具体一个Partition中的数据本地性的算法实现为下述getPreferredLocs代码中:
1.           private[spark]
2.           def getPreferredLocs(rdd: RDD[_], partition:Int): Seq[TaskLocation] = {
3.             getPreferredLocsInternal(rdd, partition,new HashSet)
4.           }
 
getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,它只能被DAGScheduler 通过线程安全方法getCacheLocs()使用。
getPreferredLocsInternal的源码如下:
1.           privatedef getPreferredLocsInternal(
2.               rdd: RDD[_],
3.               partition: Int,
4.               visited: HashSet[(RDD[_], Int)]):Seq[TaskLocation] = {
5.             // Ifthe partition has already been visited, no need to re-visit.
6.             // This avoids exponential pathexploration.  SPARK-695
7.             if (!visited.add((rdd, partition))) {
8.               // Nil has already been returned forpreviously visited partitions.
9.               return Nil
10.          }
11.          // If the partition is cached, return thecache locations
12.          val cached = getCacheLocs(rdd)(partition)
13.          if (cached.nonEmpty) {
14.            return cached
15.          }
16.          // If the RDD has some placementpreferences (as is the case for input RDDs), get those
17.          val rddPrefs =rdd.preferredLocations(rdd.partitions(partition)).toList
18.          if (rddPrefs.nonEmpty) {
19.            return rddPrefs.map(TaskLocation(_))
20.          }
21.       
22.          // If the RDD has narrow dependencies, pickthe first partition of the first narrow dependency
23.          // that has any placement preferences.Ideally we would choose based on transfer sizes,
24.          // but this will do for now.
25.          rdd.dependencies.foreach {
26.            case n: NarrowDependency[_] =>
27.              for (inPart <-n.getParents(partition)) {
28.                val locs =getPreferredLocsInternal(n.rdd, inPart, visited)
29.                if (locs != Nil) {
30.                  return locs
31.                }
32.              }
33.       
34.            case _ =>
35.          }
36.       
37.          Nil
38.        }
 
getPreferredLocsInternal代码中:
在visited中把当前的RDD和partition加进去是否能成功,visited是一个HashSet,如果已经有就出错。
如果partition被缓存,partition被缓存是指数据已经在DAGScheduler中。则在getCacheLocs(rdd)(partition)传进入rdd和partition,获取缓存的位置信息。如果获取到缓存位置信息就返回。
getCacheLocs的源码如下:
1.           private[scheduler]
2.           def getCacheLocs(rdd: RDD[_]):IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
3.             // Note: this doesn't use `getOrElse()`because this method is called O(num tasks) times
4.             if (!cacheLocs.contains(rdd.id)) {
5.               // Note: if the storage level is NONE, wedon't need to get locations from block manager.
6.               val locs: IndexedSeq[Seq[TaskLocation]] =if (rdd.getStorageLevel == StorageLevel.NONE) {
7.                 IndexedSeq.fill(rdd.partitions.length)(Nil)
8.               } else {
9.                 val blockIds =
10.                rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
11.              blockManagerMaster.getLocations(blockIds).map{ bms =>
12.                bms.map(bm =>TaskLocation(bm.host, bm.executorId))
13.              }
14.            }
15.            cacheLocs(rdd.id) = locs
16.          }
17.          cacheLocs(rdd.id)
18.        }
 
getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,Value是由分区编号索引的数组。每个数组值是RDD分区缓存位置的集合。
1.            privateval cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]
 
getPreferredLocsInternal方法中在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有的话这直接返回;如果没有首先会调用rdd.getPreferedLocations
如果自定义RDD,那一定要写getPreferedLocations,这是RDD的五大特征之一。例如想让Spark运行在HBase上或者一种现在还没有直接支持的数据库上面,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferedLocations。数据不动代码动,以HBase为例,Spark要操作HBase的数据,要求Spark运行在HBase所在的集群中,HBase是高速数据检索的引擎,数据在哪里,那Spark也需要运行在哪里。Spark能支持各种来源的数据,核心就在于getPreferedLocations。如果不实现getPreferedLocations,那就要从数据库中或HBase中将数据抓过来,速度会很慢。
RDD.scala的getPreferedLocations的源码如下:
1.            finaldef preferredLocations(split: Partition): Seq[String] = {
2.             checkpointRDD.map(_.getPreferredLocations(split)).getOrElse{
3.               getPreferredLocations(split)
4.             }
5.           }
    这个是RDD的getPreferredLocations
1.          protected defgetPreferredLocations(split: Partition): Seq[String] = Nil
 
这样数据本地性在运行之前就已经完成,因为RDD构建的时候已经有元数据的信息。说明:本节代码基于Spark 2.1的源码版本。
DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。
 
 

我的努力求学没有得到别的好处,只不过是愈来愈发觉自己的无知。
                                                                                              ——笛卡儿

学到很多东西的诀窍,就是一下子不要学很多。
                               ——洛克
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: