您的位置:首页 > 其它

8.4.2 ResultTask执行结果与Driver的交互原理及源码详解

2017-06-24 09:46 579 查看
 8.4.2 ResultTask执行结果与Driver的交互原理及源码详解
    Task的run方法调用的时候会导致Task的抽象方法runTask的调用, Task.scala的runTask方法是一个抽象方法。Task包括2种Task:ResultTask、ShuffleMapTask,抽象runTask方法具体的实现由子类的runTask实现。ResultTask的runTask具体实现源码如下。
ResultTask.scala的runTask源码:
1.     override def runTask(context: TaskContext): U= {
2.       ……
3.   //反序列RDD和func处理函数
4.       val(rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
5.       ……
6.       func(context, rdd.iterator(partition,context))
7.     }
 而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。
 
ResultTask执行结果与Driver的交互过程同ShuffleMapTask类似,最终DAGScheduler.handleTaskCompletion中task执行结果,根据ShuffleMapTask和ResultTask两种情况分别处理。其中ResultTask的处理结果如下。
DAGScheduler的handleTaskCompletion源码:
1.              casert: ResultTask[_, _] =>
2.                     // Cast to ResultStagehere because it's part of the ResultTask
3.                     // TODO Refactor thisout to a function that accepts a ResultStage
4.                     val resultStage =stage.asInstanceOf[ResultStage]
5.                     resultStage.activeJobmatch {
6.                       case Some(job) =>
7.                         if(!job.finished(rt.outputId)) {
8.                           updateAccumulators(event)
9.                           job.finished(rt.outputId)= true
10.                        job.numFinished+= 1
11.                        // If the wholejob has finished, remove it
12.                        if(job.numFinished == job.numPartitions) {
13.                          markStageAsFinished(resultStage)
14.                          cleanupStateForJobAndIndependentStages(job)
15.                          listenerBus.post(
16.                            SparkListenerJobEnd(job.jobId,clock.getTimeMillis(), JobSucceeded))
17.                        }
18.       
19.                        // taskSucceededruns some user code that might throw an exception. Make sure
20.                        // we areresilient against that.
21.                        try {
22.                          job.listener.taskSucceeded(rt.outputId,event.result)
23.                        } catch {
24.                          case e:Exception =>
25.                            // TODO:Perhaps we want to mark the resultStage as failed?
26.                            job.listener.jobFailed(newSparkDriverExecutionException(e))
27.                        }
28.                      }
29.                    case None =>
30.                      logInfo("Ignoring resultfrom " + rt + " because its job has finished")
31.                  }
 
Driver端的DAGSchueduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: