Spark作业的Stage划分,Task创建分发一直到提交给Spark的Executor的线程池执行全过程
2017-06-27 22:28
357 查看
本文是Spark提交作业到Task执行的宏观概括图
作业提交流程:
提交作业之后(省略SparkSubmit的分析不走) -> 反射创建mainClass -> 初始化SparkContext -> 使用sparkContext创建RDD -> 执行count算子runJob(runJob是SparkContext中的)方法触发作业->调用sparkContext中dagScheduler的runJob方法->调用dagScheduler的submitJob方法完成提交,返回一个waiter对象!
接下来查看一下dagScheduler的submitJob方法做了什么:
1:dagScheduler的submitJob方法中给eventProcessLoop发送JobSubmitted消息(eventProcessLoop是dagScheduler的一个消息处理器)
2:在eventProcessLoop中将JobSubmitted消息转发给dagScheduler的doOnReceive方法进行模式匹配处理
3: 调用dagScheduler的handleJobSubmitted方法(在handleJobSubmitted方法中完成Stage的划分)
4:调用dagScheduler的submitMissingTasks或submitStage方法提交Stage(submitMissingTasks和submitStage方法区别:由于递归提交Stage,需要最先提交父Stage,如果当前Stage还有父Stage的话调用submitStage进行递归提交,知道当前Stage没有父Stage时候调用submitMissingTasks提交)
5:在submitMissingTasks中将Stage信息广播出去(Stage信息主要是:Satge的RDD信息),然后将Stage序列化为Task(这里面有两种Stage分别是ShuffleMapStage和ResultStage,Task也是两种ShuffleMapTask和ResultTask)
6:调用TaskScheduler(实现类一般是TaskSchedulerImpl)的submitTasks方法为序列化好的TaskSet的创建TaskSetManager(TaskSetManager负责调用跟踪Task等),然后存储到schedulableBuilder(schedulableBuilder是一种资源分配策略,FIFO或FAIR),然后调用backend的reviveOffers方法进行资源分配(CoarseGrainedSchedulerBackend是 backend的一种实现,此处以CoarseGrainedSchedulerBackend为例分析)
7:调用的reviveOffers方法,此时给Driver发ReviveOffers消息申请资源
8: DriverEndpoint中receive方法模式匹配处理申请资源消息,调用DriverEndpoint的makeOffers,在makeOffers方法中调用DriverEndpoint的TaskSchedulerImpl成员(注意: DriverEndpoint是CoarseGrainedSchedulerBackend的内部类,所以依赖外部的TaskSchedulerImpl成员)的resourceOffers给Task分配运行节点
9:再调用DriverEndpoint的launchTasks方法启动任务
10:在DriverEndpoint的launchTasks方法中对Seq[TaskDescription]遍历进行序列化(在资源分配时候返回的Seq[TaskDescription])然后给CoarseGrainedExecutorBackend发LaunchTask(newSerializableBuffer(serializedTask))消息 。
11:在CoarseGrainedExecutorBackend的receive方法中处理LaunchTask(newSerializableBuffer(serializedTask))消息,首先反序列化Task为TaskDescription信息,然后调用executor(executor为spark的Executor的实例)的launchTask方法,将TaskDesc转化为TaskRunner,最后提交org.apache.spark.executor.Executor的threadPool线程池执行(threadPool底层是java的线程池,实现方式:Executors.newCachedThreadPool,Spark的Executor是线程池和SparkEnv、id、hostname的封装)。到此任务提交到执行完毕!
作业提交流程:
提交作业之后(省略SparkSubmit的分析不走) -> 反射创建mainClass -> 初始化SparkContext -> 使用sparkContext创建RDD -> 执行count算子runJob(runJob是SparkContext中的)方法触发作业->调用sparkContext中dagScheduler的runJob方法->调用dagScheduler的submitJob方法完成提交,返回一个waiter对象!
接下来查看一下dagScheduler的submitJob方法做了什么:
1:dagScheduler的submitJob方法中给eventProcessLoop发送JobSubmitted消息(eventProcessLoop是dagScheduler的一个消息处理器)
2:在eventProcessLoop中将JobSubmitted消息转发给dagScheduler的doOnReceive方法进行模式匹配处理
3: 调用dagScheduler的handleJobSubmitted方法(在handleJobSubmitted方法中完成Stage的划分)
4:调用dagScheduler的submitMissingTasks或submitStage方法提交Stage(submitMissingTasks和submitStage方法区别:由于递归提交Stage,需要最先提交父Stage,如果当前Stage还有父Stage的话调用submitStage进行递归提交,知道当前Stage没有父Stage时候调用submitMissingTasks提交)
5:在submitMissingTasks中将Stage信息广播出去(Stage信息主要是:Satge的RDD信息),然后将Stage序列化为Task(这里面有两种Stage分别是ShuffleMapStage和ResultStage,Task也是两种ShuffleMapTask和ResultTask)
6:调用TaskScheduler(实现类一般是TaskSchedulerImpl)的submitTasks方法为序列化好的TaskSet的创建TaskSetManager(TaskSetManager负责调用跟踪Task等),然后存储到schedulableBuilder(schedulableBuilder是一种资源分配策略,FIFO或FAIR),然后调用backend的reviveOffers方法进行资源分配(CoarseGrainedSchedulerBackend是 backend的一种实现,此处以CoarseGrainedSchedulerBackend为例分析)
7:调用的reviveOffers方法,此时给Driver发ReviveOffers消息申请资源
8: DriverEndpoint中receive方法模式匹配处理申请资源消息,调用DriverEndpoint的makeOffers,在makeOffers方法中调用DriverEndpoint的TaskSchedulerImpl成员(注意: DriverEndpoint是CoarseGrainedSchedulerBackend的内部类,所以依赖外部的TaskSchedulerImpl成员)的resourceOffers给Task分配运行节点
9:再调用DriverEndpoint的launchTasks方法启动任务
10:在DriverEndpoint的launchTasks方法中对Seq[TaskDescription]遍历进行序列化(在资源分配时候返回的Seq[TaskDescription])然后给CoarseGrainedExecutorBackend发LaunchTask(newSerializableBuffer(serializedTask))消息 。
11:在CoarseGrainedExecutorBackend的receive方法中处理LaunchTask(newSerializableBuffer(serializedTask))消息,首先反序列化Task为TaskDescription信息,然后调用executor(executor为spark的Executor的实例)的launchTask方法,将TaskDesc转化为TaskRunner,最后提交org.apache.spark.executor.Executor的threadPool线程池执行(threadPool底层是java的线程池,实现方式:Executors.newCachedThreadPool,Spark的Executor是线程池和SparkEnv、id、hostname的封装)。到此任务提交到执行完毕!
相关文章推荐
- Spark中job、stage、task的划分+源码执行过程分析
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- spark中executor执行Driver发送的task,放入线程池中执行原理
- spark源码之Job执行(1)stage划分与提交
- spark 2版本 中stage划分和stage作业提交详解
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- Spark学习之8:Stage提交到Task执行
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- SparkSubmit 提交作业源码流程粗略概述(含application中 driver、client、 executor的创建)
- spark源码之Job执行(1)stage划分与提交