您的位置:首页 > 其它

Spark作业的Stage划分,Task创建分发一直到提交给Spark的Executor的线程池执行全过程

2017-06-27 22:28 357 查看
本文是Spark提交作业到Task执行的宏观概括图

作业提交流程:

提交作业之后(省略SparkSubmit的分析不走) -> 反射创建mainClass  -> 初始化SparkContext -> 使用sparkContext创建RDD -> 执行count算子runJob(runJob是SparkContext中的)方法触发作业->调用sparkContext中dagScheduler的runJob方法->调用dagScheduler的submitJob方法完成提交,返回一个waiter对象!

接下来查看一下dagScheduler的submitJob方法做了什么:

1:dagScheduler的submitJob方法中给eventProcessLoop发送JobSubmitted消息(eventProcessLoop是dagScheduler的一个消息处理器) 

2:在eventProcessLoop中将JobSubmitted消息转发给dagScheduler的doOnReceive方法进行模式匹配处理

3: 调用dagScheduler的handleJobSubmitted方法(在handleJobSubmitted方法中完成Stage的划分)

4:调用dagScheduler的submitMissingTasks或submitStage方法提交Stage(submitMissingTasks和submitStage方法区别:由于递归提交Stage,需要最先提交父Stage,如果当前Stage还有父Stage的话调用submitStage进行递归提交,知道当前Stage没有父Stage时候调用submitMissingTasks提交)

5:在submitMissingTasks中将Stage信息广播出去(Stage信息主要是:Satge的RDD信息),然后将Stage序列化为Task(这里面有两种Stage分别是ShuffleMapStage和ResultStage,Task也是两种ShuffleMapTask和ResultTask)

6:调用TaskScheduler(实现类一般是TaskSchedulerImpl)的submitTasks方法为序列化好的TaskSet的创建TaskSetManager(TaskSetManager负责调用跟踪Task等),然后存储到schedulableBuilder(schedulableBuilder是一种资源分配策略,FIFO或FAIR),然后调用backend的reviveOffers方法进行资源分配(CoarseGrainedSchedulerBackend是 backend的一种实现,此处以CoarseGrainedSchedulerBackend为例分析)

7:调用的reviveOffers方法,此时给Driver发ReviveOffers消息申请资源

8: DriverEndpoint中receive方法模式匹配处理申请资源消息,调用DriverEndpoint的makeOffers,在makeOffers方法中调用DriverEndpoint的TaskSchedulerImpl成员(注意: DriverEndpoint是CoarseGrainedSchedulerBackend的内部类,所以依赖外部的TaskSchedulerImpl成员)的resourceOffers给Task分配运行节点

9:再调用DriverEndpoint的launchTasks方法启动任务

10:在DriverEndpoint的launchTasks方法中对Seq[TaskDescription]遍历进行序列化(在资源分配时候返回的Seq[TaskDescription])然后给CoarseGrainedExecutorBackend发LaunchTask(newSerializableBuffer(serializedTask))消息 。

11:在CoarseGrainedExecutorBackend的receive方法中处理LaunchTask(newSerializableBuffer(serializedTask))消息,首先反序列化Task为TaskDescription信息,然后调用executor(executor为spark的Executor的实例)的launchTask方法,将TaskDesc转化为TaskRunner,最后提交org.apache.spark.executor.Executor的threadPool线程池执行(threadPool底层是java的线程池,实现方式:Executors.newCachedThreadPool,Spark的Executor是线程池和SparkEnv、id、hostname的封装)。到此任务提交到执行完毕!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: