Spark源码分析之Master资源调度算法原理
2017-11-09 09:41
676 查看
Master是通过schedule方法进行资源调度,告知worker启动executor等。
2将可用的worker节点打乱,这样有利于driver的均衡
3进行driver资源调度,遍历处于等待状态的driver队列,发起driver
4在worker上开启executor进程
private def
schedule(): Unit = {
// 只有alive状态的master才可以进行资源调度,standby是不能够调度的
if (state
!= RecoveryState.ALIVE) {
return
}
// 将可用的worker节点打乱,这样有利于driver的均衡
val shuffledAliveWorkers
= Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
val numWorkersAlive
= shuffledAliveWorkers.size
var curPos
= 0
// 进行driver资源调度,遍历处于等待状态的driver队列
for (driver <- waitingDrivers.toList) {
var launched
= false
var numWorkersVisited=
0
while (numWorkersVisited
< numWorkersAlive&& !launched) {
// 获取worker
val worker
= shuffledAliveWorkers(curPos)
// 记录worker访问数递增
numWorkersVisited+=
1
// 判断worker的可使用内存是否大于driver所需要的内存以及worker可使用cpu核数是否大于driver所需要的cpu核数
if (worker.memoryFree
>= driver.desc.mem
&& worker.coresFree
>= driver.desc.cores) {
// 满足条件发起driver
launchDriver(worker, driver)
// 将当前driver从等待队列中移除
waitingDrivers-=
driver
// 标记该driver发起状态为true
launched =
true
}
// 将指针指向下一个worker,当然如果driver已经发起了,则为下一个准备发起下一个处于等待的driver
curPos = (curPos
+ 1) % numWorkersAlive
}
}
// 在worker上开启executor进程
startExecutorsOnWorkers()
}
# 得到每一个executor所需要的核数
# 过滤出有效的可用worker,再从worker中过滤出worker剩余内存和CPU核数 不小于app对应executor所需要的内存和CPU核数,按照剩余的CPU核数反向排序worker
# 在可用的worker上调度executor,启动executor有两种算法模式:
一:将应用程序尽可能多的分配到不同的worker上
二:和第一种相反,分配到尽可能少的worker上,通常用于计算密集型
每一个executor所需要的核数是可以配置的,一般来讲如果worker有足够的内存和CPU核数,同一个应用程序就可以在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了
# 在可用的worker上分配资源给executor
# 获取每一个executor应该分配的核数,如果没有指定则使用计算的应该分配的核数
# 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数
# 启动executor
# 更新application的状态
一schedule方法
1判断master状态,只有alive状态的master才可以进行资源调度,standby是不能够调度的2将可用的worker节点打乱,这样有利于driver的均衡
3进行driver资源调度,遍历处于等待状态的driver队列,发起driver
4在worker上开启executor进程
private def
schedule(): Unit = {
// 只有alive状态的master才可以进行资源调度,standby是不能够调度的
if (state
!= RecoveryState.ALIVE) {
return
}
// 将可用的worker节点打乱,这样有利于driver的均衡
val shuffledAliveWorkers
= Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
val numWorkersAlive
= shuffledAliveWorkers.size
var curPos
= 0
// 进行driver资源调度,遍历处于等待状态的driver队列
for (driver <- waitingDrivers.toList) {
var launched
= false
var numWorkersVisited=
0
while (numWorkersVisited
< numWorkersAlive&& !launched) {
// 获取worker
val worker
= shuffledAliveWorkers(curPos)
// 记录worker访问数递增
numWorkersVisited+=
1
// 判断worker的可使用内存是否大于driver所需要的内存以及worker可使用cpu核数是否大于driver所需要的cpu核数
if (worker.memoryFree
>= driver.desc.mem
&& worker.coresFree
>= driver.desc.cores) {
// 满足条件发起driver
launchDriver(worker, driver)
// 将当前driver从等待队列中移除
waitingDrivers-=
driver
// 标记该driver发起状态为true
launched =
true
}
// 将指针指向下一个worker,当然如果driver已经发起了,则为下一个准备发起下一个处于等待的driver
curPos = (curPos
+ 1) % numWorkersAlive
}
}
// 在worker上开启executor进程
startExecutorsOnWorkers()
}
二startExecutorsOnWorkers 在worker上开启executor进程
# 遍历处于等待状态的application,且处于等待的状态的application的所需要的cpu核数大于0# 得到每一个executor所需要的核数
# 过滤出有效的可用worker,再从worker中过滤出worker剩余内存和CPU核数 不小于app对应executor所需要的内存和CPU核数,按照剩余的CPU核数反向排序worker
# 在可用的worker上调度executor,启动executor有两种算法模式:
一:将应用程序尽可能多的分配到不同的worker上
二:和第一种相反,分配到尽可能少的worker上,通常用于计算密集型
每一个executor所需要的核数是可以配置的,一般来讲如果worker有足够的内存和CPU核数,同一个应用程序就可以在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了
# 在可用的worker上分配资源给executor
private def startExecutorsOnWorkers(): Unit = { // 遍历处于等待状态的application,且处于等待的状态的application的所需要的cpu核数大于0 // coresLeft=app请求的核数-已经分配给executor的核数的和 for (app <- waitingApps if app.coresLeft > 0) { // 每一个executor所需要的核数 val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // 过滤出有效的可用worker // 再从worker中过滤出worker剩余内存和CPU核数不小于app对应executor所需要的内存和CPU核数 // 按照剩余的CPU核数反向排序woker val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse // 在可用的worker上调度executor,启动executor有两种算法模式: // 一:将应用程序尽可能多的分配到不同的worker上 // 二:和第一种相反,分配到尽可能少的worker上,通常用于计算密集型; // 每一个executor所需要的核数是可以配置的,一般来讲如果worker有足够的内存和CPU核数,同一个应用程序就可以 // 在该worker启动多个executors;否则就不能再启动新的executor了,则需要到其他worker上去分配executor了 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // 在可用的worker上分配资源给executor for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
三scheduleExecutorsOnWorkers在每一个worker上调度资源
判断该worker能不能分配一个或者多个executor,能则分配相对应的executor所需要的CPU核数private def scheduleExecutorsOnWorkers(app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { // 如果我们指定executor需要分配的核数,coresPerExecutor表示executor所需要的cpu核数 val coresPerExecutor = app.desc.coresPerExecutor // app中每个executor所需要的最小cpu核数,如果没有默认最小核数为1 val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 如果我们没有指定executor需要分配的核数,则一个worker上只能启动一个executor val oneExecutorPerWorker = coresPerExecutor.isEmpty // 每一个executor所需要的内存 val memoryPerExecutor = app.desc.memoryPerExecutorMB // 获取可用worker数量 val numUsable = usableWorkers.length // 构建一个可用worker长度的数组,用于存放每个worker节点分配到的cpu核数(16,16,16,16) val assignedCores = new Array[Int](numUsable) // 构建一个可用worker长度的数组,用于存放每一个worker上新分配的executor数量(1,2,1,0) val assignedExecutors = new Array[Int](numUsable) // 针对当前应用程序,还需要分配的cpu核数,它应该是application还需要的cpu核数和worker总共剩余核数之和中最小的 // 防止超过当前可用的cpu核数 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 判断我们是否可以为这个application在指定的worker上发起一个executor def canLaunchExecutor(pos: Int): Boolean = { // 判断当前需要分配的cpu核数是否大于或者等于每个executor所需要的cpu核数,比如总共只能分配8核,但是 // 每个executor所需要的cpu核数是12,那么就不能发起executor了,因为资源不够用 val keepScheduling = coresToAssign >= minCoresPerExecutor // 当前worker剩余的核数 - 应用程序分配到该worker上的核数是否满足发起一个executor,比如现在worker剩余核数16 // 然后又给application他分配了12核,即还剩4核可用,但是启动一个executor需要12核,那么4 < 12 表示内核不足使用了 val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // 如果我们允许每一个worker启动多个executor,然后我们可以启动一个新的executor // 否则如果worker已经启动一个新executor,只需要将更多的内核分配给该executor即可 val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 // 如果需要发起新的executor,既需要判断cpu核数是否足够,还需要判断 executor是否超过限制总数以及否内存是否足够 if (launchingNewExecutor) { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // 否则只是对已经存在的executor添加cpu核数,没必要检查内存和executor限制 keepScheduling && enoughCores } } // 过滤出那些可用的worker节点 var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) while (freeWorkers.nonEmpty) { // 遍历每一个空闲的worker freeWorkers.foreach { pos => var keepScheduling = true // 检测当前worker是否能够发起executor while (keepScheduling && canLaunchExecutor(pos)) { // 需要分配的核数减去每个executor所需要的最小核数 coresToAssign -= minCoresPerExecutor // 对应的worker节点需要分配的cpu核数加上要启动该executor所需要的最小CPU核数 assignedCores(pos) += minCoresPerExecutor // 如果每一个worker只允许启动一个executor,那么该worker启动的executor数量只能是1,否则应该加一个 if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } // 如果需要将executor分配到更多的worker,那么就不再从当前worker节点继续分配,而是从下一个worker上继续分配 if (spreadOutApps) { keepScheduling = false } } } // 因为进行了一次分配,需要再次从可用的worker节点中过滤可用的worker节点 freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores }
四allocateWorkerResourceToExecutors在worker上分配具体的资源
# 获取该worker应该有多少个executor# 获取每一个executor应该分配的核数,如果没有指定则使用计算的应该分配的核数
# 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数
# 启动executor
# 更新application的状态
private def allocateWorkerResourceToExecutors(app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // 获取该worker应该有多少个executor val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) // 获取每一个executor应该分配的核数,如果没有指定则使用计算的应该分配的核数 val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { // 向worker上添加executor,创建ExecutorDesc对象,更新application已经分配到的cpu核数 val exec = app.addExecutor(worker, coresToAssign) // 启动executor launchExecutor(worker, exec) // 更新application的状态 app.state = ApplicationState.RUNNING } }
五launchDriver 发起driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) // worker添加driver worker.addDriver(driver) driver.worker = Some(worker) // 向worker发送LaunchDriver消息 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) // 更新driver状态为RUNNING driver.state = DriverState.RUNNING }
六launchExecutor发起executor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) // worker启动executor,并且更新worker的cpu和内存信息 worker.addExecutor(exec) // 向worker发送LaunchExecutor消息 worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) // 向application发送ExecutorAdded消息 exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
相关文章推荐
- Master原理剖析与源码分析:资源调度机制源码分析(schedule(),两种资源调度算法)
- SPARK的MAster资源调度原理(源码)分析
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- spark源码分析之master资源调度schedule篇
- 深入理解Spark 2.1 Core (六):资源调度的原理与源码分析
- Spark源码分析之Master注册机制原理
- spark学习-Master资源调度分配算法
- Spark内核源码深度剖析:Master主备切换机制原理剖析与源码分析
- Spark 随机森林算法原理、源码分析及案例实战
- spark源码学习(二)---Master源码分析(3)-master对driver、executor的调度
- Spark 随机森林算法原理、源码分析及案例实战
- Spark源码分析之Master状态改变处理机制原理
- Spark源码分析之二:Job的调度模型与运行反馈
- spark core源码分析2 master启动流程
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- OpenCV学习笔记(28)KAZE 算法原理与源码分析(二)非线性尺度空间构建
- spark 调度模块详解及源码分析
- Spark源码分析(四)调度管理2
- 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
- Zeppelin 源码分析-调度和资源分析(2)