8.4.2 ResultTask执行结果与Driver的交互原理及源码详解
2017-06-24 09:46
579 查看
8.4.2 ResultTask执行结果与Driver的交互原理及源码详解
Task的run方法调用的时候会导致Task的抽象方法runTask的调用, Task.scala的runTask方法是一个抽象方法。Task包括2种Task:ResultTask、ShuffleMapTask,抽象runTask方法具体的实现由子类的runTask实现。ResultTask的runTask具体实现源码如下。
ResultTask.scala的runTask源码:
1. override def runTask(context: TaskContext): U= {
2. ……
3. //反序列RDD和func处理函数
4. val(rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
5. ……
6. func(context, rdd.iterator(partition,context))
7. }
而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。
ResultTask执行结果与Driver的交互过程同ShuffleMapTask类似,最终DAGScheduler.handleTaskCompletion中task执行结果,根据ShuffleMapTask和ResultTask两种情况分别处理。其中ResultTask的处理结果如下。
DAGScheduler的handleTaskCompletion源码:
1. casert: ResultTask[_, _] =>
2. // Cast to ResultStagehere because it's part of the ResultTask
3. // TODO Refactor thisout to a function that accepts a ResultStage
4. val resultStage =stage.asInstanceOf[ResultStage]
5. resultStage.activeJobmatch {
6. case Some(job) =>
7. if(!job.finished(rt.outputId)) {
8. updateAccumulators(event)
9. job.finished(rt.outputId)= true
10. job.numFinished+= 1
11. // If the wholejob has finished, remove it
12. if(job.numFinished == job.numPartitions) {
13. markStageAsFinished(resultStage)
14. cleanupStateForJobAndIndependentStages(job)
15. listenerBus.post(
16. SparkListenerJobEnd(job.jobId,clock.getTimeMillis(), JobSucceeded))
17. }
18.
19. // taskSucceededruns some user code that might throw an exception. Make sure
20. // we areresilient against that.
21. try {
22. job.listener.taskSucceeded(rt.outputId,event.result)
23. } catch {
24. case e:Exception =>
25. // TODO:Perhaps we want to mark the resultStage as failed?
26. job.listener.jobFailed(newSparkDriverExecutionException(e))
27. }
28. }
29. case None =>
30. logInfo("Ignoring resultfrom " + rt + " because its job has finished")
31. }
Driver端的DAGSchueduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。
Task的run方法调用的时候会导致Task的抽象方法runTask的调用, Task.scala的runTask方法是一个抽象方法。Task包括2种Task:ResultTask、ShuffleMapTask,抽象runTask方法具体的实现由子类的runTask实现。ResultTask的runTask具体实现源码如下。
ResultTask.scala的runTask源码:
1. override def runTask(context: TaskContext): U= {
2. ……
3. //反序列RDD和func处理函数
4. val(rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
5. ……
6. func(context, rdd.iterator(partition,context))
7. }
而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。
ResultTask执行结果与Driver的交互过程同ShuffleMapTask类似,最终DAGScheduler.handleTaskCompletion中task执行结果,根据ShuffleMapTask和ResultTask两种情况分别处理。其中ResultTask的处理结果如下。
DAGScheduler的handleTaskCompletion源码:
1. casert: ResultTask[_, _] =>
2. // Cast to ResultStagehere because it's part of the ResultTask
3. // TODO Refactor thisout to a function that accepts a ResultStage
4. val resultStage =stage.asInstanceOf[ResultStage]
5. resultStage.activeJobmatch {
6. case Some(job) =>
7. if(!job.finished(rt.outputId)) {
8. updateAccumulators(event)
9. job.finished(rt.outputId)= true
10. job.numFinished+= 1
11. // If the wholejob has finished, remove it
12. if(job.numFinished == job.numPartitions) {
13. markStageAsFinished(resultStage)
14. cleanupStateForJobAndIndependentStages(job)
15. listenerBus.post(
16. SparkListenerJobEnd(job.jobId,clock.getTimeMillis(), JobSucceeded))
17. }
18.
19. // taskSucceededruns some user code that might throw an exception. Make sure
20. // we areresilient against that.
21. try {
22. job.listener.taskSucceeded(rt.outputId,event.result)
23. } catch {
24. case e:Exception =>
25. // TODO:Perhaps we want to mark the resultStage as failed?
26. job.listener.jobFailed(newSparkDriverExecutionException(e))
27. }
28. }
29. case None =>
30. logInfo("Ignoring resultfrom " + rt + " because its job has finished")
31. }
Driver端的DAGSchueduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。
相关文章推荐
- ShuffleMapTask执行结果和Driver的交互原理及源码
- 第三十七课 Spark之Task执行原理及结果
- Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理
- Pyunit源码笔记之九 testsuite执行后,runner/result的结果输出
- Laravel5.5源码详解 -- 一次查询的详细执行:从Auth-Login-web中间件到数据库查询结果的全过程
- spark中executor执行Driver发送的task,放入线程池中执行原理
- Apache Ant中任务(Task)的执行原理实例说明
- startActivityForResult()未正确返回执行结果
- Android中launcherMode="singleTask"详解【android源码解析六】
- ASP.NET页面与IIS底层交互和工作原理详解
- Spark 源码分析 -- task实际执行过程
- ibatis源码学习3_参数和结果的映射原理
- 用重定向原理实现远程执行交互
- ASP.NET页面与IIS底层交互和工作原理详解
- struts2结果集原理和自定义结果集 如果result标签中写了其他内容该怎么写跳转页面
- Struts 2的执行原理详解
- 利用源码详解try_catch_finnaly语句执行相关真相
- Hadoop源码流程分析4-Task节点执行任务
- ASP.NET页面与IIS底层交互和工作原理详解
- ASP.NET页面与IIS底层交互和工作原理详解(一)