您的位置:首页 > 其它

Spark源码分析-2.Job提交

2018-02-11 21:12 113 查看
在第一篇文章中提到了一次action操作会触发RDD的延迟计算,我们把这样的一次计算称作一个Job。我们看下一个job的提交过程。我们用最常见的collect举例。

调用栈:

1. RDD.collect()

2. SparkContext.runJob()

3. DAGScheduler.runJob()

4. DAGScheduler.submitJob

5. DAGSchedulerEventProcessLoop.onReceive

6. DAGSchedulerEventProcessLoop.doOnReceive

7. DAGScheduler.handleJobSubmitted

RDD.collect()

collect将RDD数据集转换为数组

Array.concat(results: _*)是将results转换为参数序列,也就是说将原来的一个results的每个元素进行拼接。看了下sc.runJob()的源码,也是一个Array,我猜可能是因为sc.runJob()可能返回null,collect需要返回一个空的Array代替null吧

def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}


SparkContext.runJob()



从图中可以看出一共有7个runJob函数,我就不一一分析了,稍后我们选择第一个来分析:第一个runJob函数有四个参数,其他的runJob都调用这个runJob,比如第二个是rdd.collect()中用到的runJob,它有两个入参,调用了一个有三个参数的runJob。

分析:

rdd:运行任务的目标RDD

func:运行在RDD每个分区上的函数

partitions:目标RDD执行任务的分区集合,一些任务不是运行在所有的分区上,如first

resultHandler:回调结果

函数主要进行了dagScheduler.runJob()和rdd.doCheckPoint()两个操作,一个是运行任务,一个是RDD容错检查。

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}


DAGScheduler.runJob
4000
()

下面解释了两个参数,其他的参数在之前解释过了

DAGScheduler.runJob()获取运行开始任务的时间,然后调用了submitJob函数,最后判断任务运行成功与否,输出相关信息。

这里的waiter是JobWaiter对象,包含了Job运行的信息,它的职责是

1. 等待DAGScheduler job完成,一个JobWaiter对象与一个job唯一一一对应

2. 一旦task完成,将该task结果填充到SparkContext.runJob创建的results数组中

/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
*   partitions of the target RDD, e.g. for operations like first()
* @param callSite 用户程序的调用点
* @param resultHandler callback to pass each result to
* @param properties 依附在这个job上的调度器特性,如公平调度器
*
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}


DAGScheduler.submitJob

程序首先检查分区错误,从代码可以看出,分区数是从0开始的

倒数第二行代码eventProcessLoop.post(…)就是把一个JobSubmitted放到事件队列eventQueue中循环执行,eventProcessLoop是一个DAGSchedulerEventProcessLoop类。接受事件放入队列中,时间之后会被线程处理。

/**
* @return 一个JobWaiter对象可以用来阻塞作业,直到作业完成或可以用来取消作业。
*
* @throws IllegalArgumentException 当partitionId非法时
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查:确保不会在不存在的分区上运行任务
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// 如果分区数为0则立刻返回
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}


def post(event: E): Unit = {
eventQueue.put(event)
}


事件event被post到队列中,onReceive接收这个提交的事件并处理

DAGSchedulerEventProcessLoop.onReceive

DAGSchedulerEventProcessLoop在DAGScheduler.scala中,是DAG调度程序的主事件循环器。从源码中可以看出,这个事件event通过doOnReceive来处理。

override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}


DAGSchedulerEventProcessLoop.doOnReceive

doOnReceive是当收到一个Event进行的操作,从代码可以看出,对于不同状态的job执行不同的操作

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}


我们看JobSubmitted对象对应的操作,即handleJobSubmitted

DAGScheduler.handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 创建新的stage可能会抛出异常,比如,如果HDFS文件删除了,运行在HadoopRDD上的job抛出异常
// HadoopRDD是通过HDFS文件系统创建的RDD
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}


看到这里,job层面的代码基本了解了,关于Stage的划分和task的的创建在后面的博客会写。

参考:

1. https://www.jianshu.com/p/e8c4bc5abedc

2. http://blog.csdn.net/u011239443/article/details/53911902
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: