spark源码学习(七)--- TaskScheduler源码分析
2016-02-20 23:18
309 查看
上一篇文章中,当程序执行到submitMissingTask的时候,会调用taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))这个方法,其中的tasks是一个seq集合,每个partition会创建一个Task(ShuffleMapTask或者ResultTask),taskScheduler.submitTasks将TaskSet提交给TaskScheduler。该方法在TaskScheduler的子类TaskSchedulerImpl中实现。override
def submitTasks(taskSet: TaskSet)该方法接收TaskSet作为参数,TaskSet中封装了task Array和一些task的优先级,所属stageId等信息。
该方法的最后backend.reviveOffers()调用了SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend的reviveOffers方法,最终调用了CoarseGrainedSchedulerBackend的makeOffers方法:
该方法先将task分配到executor上,然后执行lunchTask方法,注意后面的launchTasks(scheduler.resourceOffers(workOffers))参数是一个TaskScheduler的resourceOffers方法,WorkerOffer代表了每个task以及executor和该executor所有可用资源。下面来看resourceOffers方法:
其中这段代码
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
是分配的核心,遍历所有的taskset,以及本地化级别myLocalityLevels,本地化级别分为几种:
1.task和partition在同一个executor。
2.task和partition在同一个worker上。
3.没有本地化级别。
4.机架级别本地化。
5.any
本地化级别代表了任务执行的效率。前面的本地化性能好于后面。对当前taskset优先使用最小的本地化级别,然后将taskset的task在executor上启动。该段代码从最小的本地化级别也就是最优的本地化级别开始,launchedTask如果false也就是该级别下不能启动task,那么跳出while循环,进入下一个优先级级别的本地化级别,直到最后,将task都分配给executor。判断是否能够在某个级别启动的代码如下:
上面的步骤将task分配到响应的executor上去,回到上面的makeOffers函数,上面的makeOffers返回了tasks传递给launchTask函数,向executor通信,根据分配好的executor与task启动任务。executor收到启动任务的消息后的操作后面分析。
def submitTasks(taskSet: TaskSet)该方法接收TaskSet作为参数,TaskSet中封装了task Array和一些task的优先级,所属stageId等信息。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //为每个task创建一个TaskSetManager,TaskSetManager会负责该task的任务管理和监控, //在TaskSchedulerImpl中,TaskSetManager对taskset的每一个Task任务进行调度。这个类会管理taskSet任务的运行,如果任务失败会重试 val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) //加入内存中 stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } //SchedulableBuilder是以TaskSetManager为叶子节点的一棵树 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } //该backend就是一个SparkDeploySchedulerBackend,该backend也负责APPclient的创建,向master注册application的。该backend就是前面sparkContext文中提到的那个backend backend.reviveOffers() }
该方法的最后backend.reviveOffers()调用了SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend的reviveOffers方法,最终调用了CoarseGrainedSchedulerBackend的makeOffers方法:
private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) }
该方法先将task分配到executor上,然后执行lunchTask方法,注意后面的launchTasks(scheduler.resourceOffers(workOffers))参数是一个TaskScheduler的resourceOffers方法,WorkerOffer代表了每个task以及executor和该executor所有可用资源。下面来看resourceOffers方法:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { //记录executor以及对应的host executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. //将传过来的WorkerOffer打乱,以便均衡 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray // val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
其中这段代码
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
是分配的核心,遍历所有的taskset,以及本地化级别myLocalityLevels,本地化级别分为几种:
1.task和partition在同一个executor。
2.task和partition在同一个worker上。
3.没有本地化级别。
4.机架级别本地化。
5.any
本地化级别代表了任务执行的效率。前面的本地化性能好于后面。对当前taskset优先使用最小的本地化级别,然后将taskset的task在executor上启动。该段代码从最小的本地化级别也就是最优的本地化级别开始,launchedTask如果false也就是该级别下不能启动task,那么跳出while循环,进入下一个优先级级别的本地化级别,直到最后,将task都分配给executor。判断是否能够在某个级别启动的代码如下:
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
上面的步骤将task分配到响应的executor上去,回到上面的makeOffers函数,上面的makeOffers返回了tasks传递给launchTask函数,向executor通信,根据分配好的executor与task启动任务。executor收到启动任务的消息后的操作后面分析。
相关文章推荐
- Atitit.程序包装exe启动器 打包 发布 设计 -生成exe java
- Experimental Educational Round: VolBIT Formulas Blitz(R)博弈
- Atitit.程序包装exe启动器 打包 发布 设计 -生成exe java
- Atitit.程序包装exe启动器 打包 发布 设计 -生成exe java
- 关于Activity被染色的原因以及解决方案
- HDOJ 5630 Rikka with Chess
- async和await
- 设计一个只能在堆上或栈上实例化的类
- 实现nginx随机直接输出字符
- Java之集合框架
- C++高频面试题
- java String字符串进行排序
- 【NOIP2014TG】无线网络发射器选址
- [Python入门]Chapter4 接口设计
- 《数据结构》进行曲(之一)---线性表的顺序表示
- 为什么在jsp中写${pageContext.request.contextPath }失效了
- ecshop foreach循环判断循环次数
- C++基本概念——C++中的友元
- utilities(C++)——枚举
- Python图像处理库PIL的Image模块介绍(四)