您的位置:首页 > 其它

spark源码学习(十一):资源的调度Schedule

2016-03-04 15:44 381 查看
spark源码学习(十一):资源的调度Schedule

前面stabdalone模式下当worker向master注册成功之后,master会运行一个schedule函数来调度资源,当时并没有进行深入的剖析,这里就简单的来看看schedule的资源调度函数,说白了就是master是如何把每一个executor分配给application去处理任务的呢?下面来看看的具体的代码:

private def startExecutorsOnWorkers(): Unit = {
//当前的调度模式是FIFO
if (spreadOutApps) {
// 把application中需要用到的资源都平均分配到每一个worker上面
for (app <- waitingApps if app.coresLeft > 0) {
//主要就是过滤出全部可以使用的worker
val usableWorkers = workers.toArray
//处于激活状态的worker
.filter(_.state == WorkerState.ALIVE)
//worker当前的内存大于等于该app分配到当前worker上任务所需的内存大小
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB
//内核数大于等于任务所需的内核数
&&worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
//空闲内核数倒序排列
.sortBy(_.coresFree).reverse
//用于分配的时候使用
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
//取app所需的内核数和可使用的内核数的最小值作为将要分配的资源
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0)
{//逐个从各个worker中给App分配一个内核
//每当所有的worker都分配之后还有剩余的话,继续分配
if (usableWorkers(pos).coresFree - assigned(pos) > 0)
{
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}//以上的属于资源的逻辑分配
//下面的好似资源的物理分配,会显示的拉起executor
for (pos <- 0 until numUsable if assigned(pos) > 0)
{
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
}
上面仅仅显示了尽量在每一个worker上都分配相同的任务来运行。具体流程如图所示:





下面看看物理分配:主要就是调用allocateWorkerResourceToExecutors,首先调用addExecutor方法把worker的信息封装成ExecutorDesc,然后把ExecutorDesc信息添加到app的executors的缓存中,在增加已经授权的内核数,代码如下:



然后通过launchExecutor方法来实现物理分配:

1)首先把ExecutorDesc添加到WorkInfo的executors缓存中,并更新已经使用的cup和数和内存大小。

2)然后向worker发送LaunchExecutor信息,运行Executor,具体的看这里

3)向clientWorker发送ExecutorAdded消息,收到消息后向master发送ExecutorStateChanged消息,master收到消息后向DriverActor发送ExecutorUpdate消息,用于更新driver上有关executor的状态。

经过三部就完成了对workers上资源的调度,下面就会开始executor的实例化和任务的运行了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: