spark调度分析: DAGScheduler, TaskScheduler, SchedulerBackend
2017-03-22 13:56
495 查看
1、Runtime是如何执行的?
2、DAGScheduler主要职能有哪些?
3、TaskScheduler主要职能有哪些?
4、SchedulerBackend主要职能有哪些?
Spark Runtime里的主要层次分析,梳理Runtime组件和执行流程,
DAGScheduler
Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
面向stage的切分,切分依据为宽依赖
维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系
主要职能
1、接收提交Job的主入口,submitJob(rdd, ...)或runJob(rdd, ...)。在SparkContext里会调用这两个方法。
[Plain Text] 纯文本查看 复制代码
?
2、处理TaskCompletionEvent
1、如果task执行成功,对应的stage里减去这个task,做一些计数工作:
[Plain Text] 纯文本查看 复制代码
?
2、如果task是重提交,对应的stage里增加这个task
[Plain Text] 纯文本查看 复制代码
?
3、其他与job相关的操作还包括:cancel job, cancel stage, resubmit failed stage等
其他职能
1. cacheLocations 和 preferLocation
[Plain Text] 纯文本查看 复制代码
?
TaskScheduler
维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。
内部维护一个任务队列,根据FIFO或Fair策略,调度任务。
TaskScheduler本身是个接口,spark里只实现了一个TaskSchedulerImpl,理论上任务调度可以定制。下面是TaskScheduler的主要接口:
[Plain Text] 纯文本查看 复制代码
?
主要职能
1、submitTasks(taskSet),接收DAGScheduler提交来的tasks
1、为tasks创建一个TaskSetManager,添加到任务队列里。TaskSetManager跟踪每个task的执行状况,维护了task的许多具体信息。
2、触发一次资源的索要。
[Plain Text] 纯文本查看 复制代码
?
2、cancelTasks(stageId),取消一个stage的tasks
1、调用SchedulerBackend的killTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler里一直维护着。
3、resourceOffer(offers: Seq[Workers]),这是非常重要的一个方法,调用者是SchedulerBacnend,用途是底层资源SchedulerBackend把空
余的workers资源交给TaskScheduler,让其根据调度策略为排队的任务分配合理的cpu和内存资源,然后把任务描述列表传回给 SchedulerBackend
1、从worker offers里,搜集executor和host的对应关系、active executors、机架信息等等
2、worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
3、遍历每个taskSet,按照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每个taskSet提供可用的cpu核数,看是否满
足
[Plain Text] 纯文本查看 复制代码
?
3、statusUpdate(taskId, taskState, data),另一个非常重要的方法,调用者是SchedulerBacnend,用途是SchedulerBacnend会将task执行的
状态汇报给TaskScheduler做一些决定
1、若TaskLost,找到该task对应的executor,从active executor里移除,避免这个executor被分配到其他task继续失败下去。
2、task finish包括四种状态:finished, killed, failed, lost。只有finished是成功执行完成了。其他三种是失败。
3、task成功执行完,调用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否则调用TaskResultGetter.enqueueFailedTask(taskSet, tid,
state, data)。TaskResultGetter内部维护了一个线程池,负责异步fetch task执行结果并反序列化。默认开四个线程做这件事,可配参
数"spark.resultGetter.threads"=4。
TaskResultGetter取task result的逻辑
对于success task,如果taskResult里的数据是直接结果数据,直接把data反序列出来得到结果;如果不是,会调用blockManager.getRemoteBytes(blockId)从远程获取。如果远程取回的数据是空的,那么会调用TaskScheduler.handleFailedTask,告诉它这个任务是完成了的但是数据是丢失的。否则,取到数据之后会通知BlockManagerMaster移除这个block信息,调用TaskScheduler.handleSuccessfulTask,告诉它这个任务是执行成功的,并且把result
data传回去。
对于failed task,从data里解析出fail的理由,调用TaskScheduler.handleFailedTask,告诉它这个任务失败了,理由是什么。
SchedulerBackend
在TaskScheduler下层,用于对接不同的资源管理系统,SchedulerBackend是个接口,需要实现的主要方法如下:
[Plain Text] 纯文本查看 复制代码
?
粗粒度:进程常驻的模式,典型代表是standalone模式,mesos粗粒度模式,yarn
细粒度:mesos细粒度模式
这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend。
维护executor相关信息(包括executor的地址、通信端口、host、总核数,剩余核数),手头上executor有多少被注册使用了,有多少剩余,总共还有多少
核是空的等等。
主要职能
1、Driver端主要通过actor监听和处理下面这些事件:
[Plain Text] 纯文本查看 复制代码
?
2、reviveOffers()方法的实现。直接调用了makeOffers()方法,得到一批可执行的任务描述,调用launchTasks。
3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。
1、遍历每个task描述,序列化成二进制,然后发送给每个对应的executor这个任务信息
1、如果这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K),会出错,abort整个taskSet,并打印提醒
增大akka frame size
2、如果二进制数据大小可接受,发送给executor的actor,处理LaunchTask(serializedTask)事件。
Executor
Executor是spark里的进程模型,可以套用到不同的资源管理系统上,与SchedulerBackend配合使用。
内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend发来的事件。
事件处理
launchTask。根据task描述,生成一个TaskRunner线程,丢尽running tasks map里,用线程池执行这个TaskRunner
killTask。从running tasks map里拿出线程对象,调它的kill方法。
2、DAGScheduler主要职能有哪些?
3、TaskScheduler主要职能有哪些?
4、SchedulerBackend主要职能有哪些?
Spark Runtime里的主要层次分析,梳理Runtime组件和执行流程,
DAGScheduler
Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
面向stage的切分,切分依据为宽依赖
维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系
主要职能
1、接收提交Job的主入口,submitJob(rdd, ...)或runJob(rdd, ...)。在SparkContext里会调用这两个方法。
[Plain Text] 纯文本查看 复制代码
?
1、如果task执行成功,对应的stage里减去这个task,做一些计数工作:
[Plain Text] 纯文本查看 复制代码
?
[Plain Text] 纯文本查看 复制代码
?
其他职能
1. cacheLocations 和 preferLocation
[Plain Text] 纯文本查看 复制代码
?
维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。
内部维护一个任务队列,根据FIFO或Fair策略,调度任务。
TaskScheduler本身是个接口,spark里只实现了一个TaskSchedulerImpl,理论上任务调度可以定制。下面是TaskScheduler的主要接口:
[Plain Text] 纯文本查看 复制代码
?
1、submitTasks(taskSet),接收DAGScheduler提交来的tasks
1、为tasks创建一个TaskSetManager,添加到任务队列里。TaskSetManager跟踪每个task的执行状况,维护了task的许多具体信息。
2、触发一次资源的索要。
[Plain Text] 纯文本查看 复制代码
?
1、调用SchedulerBackend的killTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler里一直维护着。
3、resourceOffer(offers: Seq[Workers]),这是非常重要的一个方法,调用者是SchedulerBacnend,用途是底层资源SchedulerBackend把空
余的workers资源交给TaskScheduler,让其根据调度策略为排队的任务分配合理的cpu和内存资源,然后把任务描述列表传回给 SchedulerBackend
1、从worker offers里,搜集executor和host的对应关系、active executors、机架信息等等
2、worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
3、遍历每个taskSet,按照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每个taskSet提供可用的cpu核数,看是否满
足
[Plain Text] 纯文本查看 复制代码
?
状态汇报给TaskScheduler做一些决定
1、若TaskLost,找到该task对应的executor,从active executor里移除,避免这个executor被分配到其他task继续失败下去。
2、task finish包括四种状态:finished, killed, failed, lost。只有finished是成功执行完成了。其他三种是失败。
3、task成功执行完,调用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否则调用TaskResultGetter.enqueueFailedTask(taskSet, tid,
state, data)。TaskResultGetter内部维护了一个线程池,负责异步fetch task执行结果并反序列化。默认开四个线程做这件事,可配参
数"spark.resultGetter.threads"=4。
TaskResultGetter取task result的逻辑
对于success task,如果taskResult里的数据是直接结果数据,直接把data反序列出来得到结果;如果不是,会调用blockManager.getRemoteBytes(blockId)从远程获取。如果远程取回的数据是空的,那么会调用TaskScheduler.handleFailedTask,告诉它这个任务是完成了的但是数据是丢失的。否则,取到数据之后会通知BlockManagerMaster移除这个block信息,调用TaskScheduler.handleSuccessfulTask,告诉它这个任务是执行成功的,并且把result
data传回去。
对于failed task,从data里解析出fail的理由,调用TaskScheduler.handleFailedTask,告诉它这个任务失败了,理由是什么。
SchedulerBackend
在TaskScheduler下层,用于对接不同的资源管理系统,SchedulerBackend是个接口,需要实现的主要方法如下:
[Plain Text] 纯文本查看 复制代码
?
细粒度:mesos细粒度模式
这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend。
维护executor相关信息(包括executor的地址、通信端口、host、总核数,剩余核数),手头上executor有多少被注册使用了,有多少剩余,总共还有多少
核是空的等等。
主要职能
1、Driver端主要通过actor监听和处理下面这些事件:
[Plain Text] 纯文本查看 复制代码
?
3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。
1、遍历每个task描述,序列化成二进制,然后发送给每个对应的executor这个任务信息
1、如果这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K),会出错,abort整个taskSet,并打印提醒
增大akka frame size
2、如果二进制数据大小可接受,发送给executor的actor,处理LaunchTask(serializedTask)事件。
Executor
Executor是spark里的进程模型,可以套用到不同的资源管理系统上,与SchedulerBackend配合使用。
内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend发来的事件。
事件处理
launchTask。根据task描述,生成一个TaskRunner线程,丢尽running tasks map里,用线程池执行这个TaskRunner
killTask。从running tasks map里拿出线程对象,调它的kill方法。
相关文章推荐
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark之SchedulerBackend、DAGScheduler和TaskScheduler
- 第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详
- spark学习-65-源代码:schedulerBackend和taskScheduler的创建(3)-local-cluster
- TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解
- spark学习-66-源代码:schedulerBackend和taskScheduler的创建(4)-yarn
- 第36课: TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法
- spark学习-63-源代码:schedulerBackend和taskScheduler的创建(1)-local
- Spark源码分析 – SchedulerBackend
- spark学习-64-源代码:schedulerBackend和taskScheduler的创建(2)-StandLone
- [Spark内核] 第36课:TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
- Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
- Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
- Spark源码分析之Scheduler模块(TaskScheduler)
- 【源码学习之spark core 1.6.1 各种部署模式所使用的的TaskSceduler及SchedulerBackend】