Spark源码分析-2.Job提交
2018-02-11 21:12
113 查看
在第一篇文章中提到了一次action操作会触发RDD的延迟计算,我们把这样的一次计算称作一个Job。我们看下一个job的提交过程。我们用最常见的collect举例。
调用栈:
1. RDD.collect()
2. SparkContext.runJob()
3. DAGScheduler.runJob()
4. DAGScheduler.submitJob
5. DAGSchedulerEventProcessLoop.onReceive
6. DAGSchedulerEventProcessLoop.doOnReceive
7. DAGScheduler.handleJobSubmitted
Array.concat(results: _*)是将results转换为参数序列,也就是说将原来的一个results的每个元素进行拼接。看了下sc.runJob()的源码,也是一个Array,我猜可能是因为sc.runJob()可能返回null,collect需要返回一个空的Array代替null吧
从图中可以看出一共有7个runJob函数,我就不一一分析了,稍后我们选择第一个来分析:第一个runJob函数有四个参数,其他的runJob都调用这个runJob,比如第二个是rdd.collect()中用到的runJob,它有两个入参,调用了一个有三个参数的runJob。
分析:
rdd:运行任务的目标RDD
func:运行在RDD每个分区上的函数
partitions:目标RDD执行任务的分区集合,一些任务不是运行在所有的分区上,如first
resultHandler:回调结果
函数主要进行了dagScheduler.runJob()和rdd.doCheckPoint()两个操作,一个是运行任务,一个是RDD容错检查。
DAGScheduler.runJob
下面解释了两个参数,其他的参数在之前解释过了
DAGScheduler.runJob()获取运行开始任务的时间,然后调用了submitJob函数,最后判断任务运行成功与否,输出相关信息。
这里的waiter是JobWaiter对象,包含了Job运行的信息,它的职责是
1. 等待DAGScheduler job完成,一个JobWaiter对象与一个job唯一一一对应
2. 一旦task完成,将该task结果填充到SparkContext.runJob创建的results数组中
倒数第二行代码eventProcessLoop.post(…)就是把一个JobSubmitted放到事件队列eventQueue中循环执行,eventProcessLoop是一个DAGSchedulerEventProcessLoop类。接受事件放入队列中,时间之后会被线程处理。
事件event被post到队列中,onReceive接收这个提交的事件并处理
我们看JobSubmitted对象对应的操作,即handleJobSubmitted
看到这里,job层面的代码基本了解了,关于Stage的划分和task的的创建在后面的博客会写。
参考:
1. https://www.jianshu.com/p/e8c4bc5abedc
2. http://blog.csdn.net/u011239443/article/details/53911902
调用栈:
1. RDD.collect()
2. SparkContext.runJob()
3. DAGScheduler.runJob()
4. DAGScheduler.submitJob
5. DAGSchedulerEventProcessLoop.onReceive
6. DAGSchedulerEventProcessLoop.doOnReceive
7. DAGScheduler.handleJobSubmitted
RDD.collect()
collect将RDD数据集转换为数组Array.concat(results: _*)是将results转换为参数序列,也就是说将原来的一个results的每个元素进行拼接。看了下sc.runJob()的源码,也是一个Array,我猜可能是因为sc.runJob()可能返回null,collect需要返回一个空的Array代替null吧
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
SparkContext.runJob()
从图中可以看出一共有7个runJob函数,我就不一一分析了,稍后我们选择第一个来分析:第一个runJob函数有四个参数,其他的runJob都调用这个runJob,比如第二个是rdd.collect()中用到的runJob,它有两个入参,调用了一个有三个参数的runJob。
分析:
rdd:运行任务的目标RDD
func:运行在RDD每个分区上的函数
partitions:目标RDD执行任务的分区集合,一些任务不是运行在所有的分区上,如first
resultHandler:回调结果
函数主要进行了dagScheduler.runJob()和rdd.doCheckPoint()两个操作,一个是运行任务,一个是RDD容错检查。
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
DAGScheduler.runJob
4000
()
下面解释了两个参数,其他的参数在之前解释过了DAGScheduler.runJob()获取运行开始任务的时间,然后调用了submitJob函数,最后判断任务运行成功与否,输出相关信息。
这里的waiter是JobWaiter对象,包含了Job运行的信息,它的职责是
1. 等待DAGScheduler job完成,一个JobWaiter对象与一个job唯一一一对应
2. 一旦task完成,将该task结果填充到SparkContext.runJob创建的results数组中
/** * Run an action job on the given RDD and pass all the results to the resultHandler function as * they arrive. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite 用户程序的调用点 * @param resultHandler callback to pass each result to * @param properties 依附在这个job上的调度器特性,如公平调度器 * * @note Throws `Exception` when the job fails */ def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
DAGScheduler.submitJob
程序首先检查分区错误,从代码可以看出,分区数是从0开始的倒数第二行代码eventProcessLoop.post(…)就是把一个JobSubmitted放到事件队列eventQueue中循环执行,eventProcessLoop是一个DAGSchedulerEventProcessLoop类。接受事件放入队列中,时间之后会被线程处理。
/** * @return 一个JobWaiter对象可以用来阻塞作业,直到作业完成或可以用来取消作业。 * * @throws IllegalArgumentException 当partitionId非法时 */ def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 检查:确保不会在不存在的分区上运行任务 val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // 如果分区数为0则立刻返回 return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
def post(event: E): Unit = { eventQueue.put(event) }
事件event被post到队列中,onReceive接收这个提交的事件并处理
DAGSchedulerEventProcessLoop.onReceive
DAGSchedulerEventProcessLoop在DAGScheduler.scala中,是DAG调度程序的主事件循环器。从源码中可以看出,这个事件event通过doOnReceive来处理。override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } }
DAGSchedulerEventProcessLoop.doOnReceive
doOnReceive是当收到一个Event进行的操作,从代码可以看出,对于不同状态的job执行不同的操作private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => val filesLost = reason match { case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, filesLost) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
我们看JobSubmitted对象对应的操作,即handleJobSubmitted
DAGScheduler.handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // 创建新的stage可能会抛出异常,比如,如果HDFS文件删除了,运行在HadoopRDD上的job抛出异常 // HadoopRDD是通过HDFS文件系统创建的RDD finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return }
看到这里,job层面的代码基本了解了,关于Stage的划分和task的的创建在后面的博客会写。
参考:
1. https://www.jianshu.com/p/e8c4bc5abedc
2. http://blog.csdn.net/u011239443/article/details/53911902
相关文章推荐
- Spark技术内幕:Stage划分及提交源码分析
- Spark技术内幕:Stage划分及提交源码分析
- Spark1.3从创建到提交:5)Executor启动源码分析
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- spark core源码分析5 spark提交框架
- spark core源码分析1 集群启动及任务提交过程
- Spark技术内幕:Stage划分及提交源码分析
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- Spark2.2源码之Task任务提交源码分析
- Spark源码分析之job提交后转换为Stage
- spark 1.6.0 core源码分析5 spark提交框架
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- Spark源码分析之一:Job提交运行总流程概述
- spark源码学习(三):job的提交以及runJob函数的分析
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Apache Spark源码分析-- Job的提交与运行
- Spark源码分析之四:Stage提交
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- Spark技术内幕:Stage划分及提交源码分析