您的位置:首页 > 其它

Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend

2017-08-13 12:58 513 查看
Spark Runtime里的主要层次分析,梳理Runtime组件和运行流程,



DAGScheduler

Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency

面向stage的切分,切分依据为宽依赖

维护waiting jobs和active jobs。维护waiting stages、active stages和failed stages,以及与jobs的映射关系

主要职能

接收提交Job的主入口。
submitJob(rdd, ...)
runJob(rdd, ...)
。在
SparkContext
里会调用这两个方法。

生成一个Stage并提交,接着推断Stage是否有父Stage未完毕,若有。提交并等待父Stage。以此类推。结果是:DAGScheduler里添加了一些waiting stage和一个running stage。

running stage提交后。分析stage里Task的类型,生成一个Task描写叙述,即TaskSet。

调用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描写叙述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发运行。

DAGScheduler
提交job后。异步返回
JobWaiter
对象。能够返回job运行状态,能够cancel job,运行成功后会处理并返回结果

处理
TaskCompletionEvent


假设task运行成功,相应的stage里减去这个task。做一些计数工作:

假设task是ResultTask,计数器
Accumulator
加一。在job里为该task置true,job finish总数加一。

加完后假设finish数目与partition数目相等。说明这个stage完毕了,标记stage完毕。从running stages里减去这个stage,做一些stage移除的清理工作

假设task是ShuffleMapTask。计数器
Accumulator
加一,在stage里加上一个output location。里面是一个
MapStatus
类。
MapStatus
ShuffleMapTask
运行完毕的返回,包含location信息和block size(能够选择压缩或未压缩)。同一时候检查该stage完毕,向
MapOutputTracker
注冊本stage里的shuffleId和location信息。然后检查stage的output location里是否存在空。若存在空。说明一些task失败了,整个stage又一次提交;否则,继续从waiting stages里提交下一个须要做的stage

假设task是重提交,相应的stage里添加这个task

假设task是fetch失败,立即标记相应的stage完毕。从running stages里减去。

假设不同意retry。abort整个stage;否则,又一次提交整个stage。

另外,把这个fetch相关的location和map任务信息。从stage里剔除,从
MapOutputTracker
注销掉。最后,假设这次fetch的blockManagerId对象不为空,做一次
ExecutorLost
处理,下次shuffle会换在还有一个executor上去运行。

其它task状态会由
TaskScheduler
处理,如Exception, TaskResultLost, commitDenied等。

其它与job相关的操作还包含:cancel job, cancel stage, resubmit failed stage等

其它职能

1. cacheLocations 和 preferLocation

private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]


TaskScheduler

维护task和executor相应关系,executor和物理资源相应关系。在排队的task和正在跑的task。

内部维护一个任务队列。依据FIFO或Fair策略,调度任务。

TaskScheduler
本身是个接口,spark里仅仅实现了一个
TaskSchedulerImpl
。理论上任务调度能够定制。以下是
TaskScheduler
的主要接口:

def start(): Unit
def postStartHook() { }
def stop(): Unit
def submitTasks(taskSet: TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean)
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean


主要职能

submitTasks(taskSet)
,接收
DAGScheduler
提交来的tasks

为tasks创建一个
TaskSetManager
,加入到任务队列里。

TaskSetManager
跟踪每一个task的运行状况,维护了task的很多详细信息。

触发一次资源的索要。

首先。
TaskScheduler
对比手头的可用资源和Task队列。进行executor分配(考虑优先级、本地化等策略),符合条件的executor会被分配给
TaskSetManager


然后。得到的Task描写叙述交给
SchedulerBackend
。调用
launchTask(tasks)
。触发executor上task的运行。

task描写叙述被序列化后发给executor,executor提取task信息。调用task的
run()
方法运行计算。

cancelTasks(stageId)
,取消一个stage的tasks

调用
SchedulerBackend
killTask(taskId, executorId, ...)
方法。

taskId和executorId在
TaskScheduler
里一直维护着。

resourceOffer(offers: Seq[Workers])
,这是很重要的一个方法,调用者是
SchedulerBacnend
,用途是底层资源
SchedulerBackend
把空余的workers资源交给
TaskScheduler
。让其依据调度策略为排队的任务分配合理的cpu和内存资源。然后把任务描写叙述列表传回给
SchedulerBackend


从worker offers里。搜集executor和host的相应关系、active executors、机架信息等等

worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序

遍历每一个taskSet,依照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每一个taskSet提供可用的cpu核数,看是否满足

默认一个task须要一个cpu。设置參数为
"spark.task.cpus=1"


为taskSet分配资源,校验是否满足的逻辑,终于在
TaskSetManager
resourceOffer(execId, host, maxLocality)
方法里

满足的话,会生成终于的任务描写叙述。而且调用
DAGScheduler
taskStarted(task, info)
方法。通知
DAGScheduler
,这时候每次会触发
DAGScheduler
做一次
submitMissingStage
的尝试,即stage的tasks都分配到了资源的话,立即会被提交运行

statusUpdate(taskId, taskState, data)
,还有一个很重要的方法,调用者是
SchedulerBacnend
,用途是
SchedulerBacnend
会将task运行的状态汇报给
TaskScheduler
做一些决定

TaskLost
,找到该task相应的executor。从active executor里移除。避免这个executor被分配到其它task继续失败下去。

task finish包含四种状态:finished, killed, failed, lost。仅仅有finished是成功运行完毕了。

其它三种是失败。

task成功运行完,调用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,否则调用
TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
TaskResultGetter
内部维护了一个线程池,负责异步fetch task运行结果并反序列化。默认开四个线程做这件事,可配參数
"spark.resultGetter.threads"=4


TaskResultGetter取task result的逻辑

对于success task。假设taskResult里的数据是直接结果数据。直接把data反序列出来得到结果。假设不是。会调用
blockManager.getRemoteBytes(blockId)
从远程获取。

假设远程取回的数据是空的,那么会调用
TaskScheduler.handleFailedTask
,告诉它这个任务是完毕了的可是数据是丢失的。

否则。取到数据之后会通知
BlockManagerMaster
移除这个block信息,调用
TaskScheduler.handleSuccessfulTask
。告诉它这个任务是运行成功的。而且把result data传回去。

对于failed task。从data里解析出fail的理由,调用
TaskScheduler.handleFailedTask
。告诉它这个任务失败了,理由是什么。

SchedulerBackend

TaskScheduler
下层。用于对接不同的资源管理系统,
SchedulerBackend
是个接口。须要实现的主要方法例如以下:

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手头上的可用资源交给TaskScheduler。TaskScheduler依据调度策略分配给排队的任务吗,返回一批可运行的任务描写叙述,SchedulerBackend负责launchTask,即终于把task塞到了executor模型上,executor里的线程池会运行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException


粗粒度:进程常驻的模式。典型代表是standalone模式,mesos粗粒度模式,yarn

细粒度:mesos细粒度模式

这里讨论粗粒度模式,更好理解:
CoarseGrainedSchedulerBackend


维护executor相关信息(包含executor的地址、通信端口、host、总核数。剩余核数),手头上executor有多少被注冊使用了。有多少剩余,总共还有多少核是空的等等。

主要职能

Driver端主要通过actor监听和处理以下这些事件:

RegisterExecutor(executorId, hostPort, cores, logUrls)
。这是executor加入的来源,通常worker拉起、重新启动会触发executor的注冊。
CoarseGrainedSchedulerBackend
把这些executor维护起来,更新内部的资源信息。比方总核数添加。最后调用一次
makeOffer()
,即把手头资源丢给
TaskScheduler
去分配一次。返回任务描写叙述回来。把任务launch起来。

这个
makeOffer()
的调用会出如今不论什么与资源变化相关的事件中,以下会看到。

StatusUpdate(executorId, taskId, state, data)
。task的状态回调。首先,调用
TaskScheduler.statusUpdate
上报上去。然后。推断这个task是否运行结束了。结束了的话把executor上的freeCore加回去,调用一次
makeOffer()


ReviveOffers


这个事件就是别人直接向
SchedulerBackend
请求资源,直接调用
makeOffer()


KillTask(taskId, executorId, interruptThread)
。这个killTask的事件。会被发送给executor的actor,executor会处理
KillTask
这个事件。

StopExecutors
。通知每一个executor,处理
StopExecutor
事件。

RemoveExecutor(executorId, reason)
。从维护信息中,那这堆executor涉及的资源数减掉。然后调用
TaskScheduler.executorLost()
方法,通知上层我这边有一批资源不能用了,你处理下吧。

TaskScheduler
会继续把
executorLost
的事件上报给
DAGScheduler
,原因是
DAGScheduler
关心shuffle任务的output location。

DAGScheduler
会告诉
BlockManager
这个executor不可用了,移走它,然后把全部的stage的shuffleOutput信息都遍历一遍,移走这个executor,而且把更新后的shuffleOutput信息注冊到
MapOutputTracker
上,最后清理下本地的
CachedLocations
Map。

reviveOffers()
方法的实现。

直接调用了
makeOffers()
方法,得到一批可运行的任务描写叙述。调用
launchTasks


launchTasks(tasks: Seq[Seq[TaskDescription]])
方法。

遍历每一个task描写叙述。序列化成二进制。然后发送给每一个相应的executor这个任务信息

假设这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K)。会出错,abort整个taskSet。并打印提醒增大akka frame size

假设二进制数据大小可接受,发送给executor的actor。处理
LaunchTask(serializedTask)
事件。

Executor

Executor是spark里的进程模型。能够套用到不同的资源管理系统上。与
SchedulerBackend
配合使用。

内部有个线程池,有个running tasks map,有个actor,接收上面提到的由
SchedulerBackend
发来的事件。

事件处理

launchTask


依据task描写叙述。生成一个
TaskRunner
线程,丢尽running tasks map里。用线程池运行这个
TaskRunner


killTask
。从running tasks map里拿出线程对象,调它的kill方法。

全文完 :)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐