spark源码之Job执行(2)任务调度taskscheduler
2017-06-28 22:33
477 查看
一、submitStage
划分stages后提交taskif (tasks.size > 0) {//如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成" ... taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))//产生新的taskSet,并且由taskScheduler进行任务提交
根据前一篇博文stage划分与提交所述,stages划分完毕,提交的时候,会根据分区信息分配task,之后task再以stage为一组形成taskSet。
之后便进入了taskScheduler,进行任务提交了。
二、taskScheduler
taskScheduler是spark的低层次任务调度接口,是一个trait。具体实现是由TaskSchedulerImpl完成的,当然根据集群部署的模式还会有不同的实现。taskScheduler接口可以实现多种任务调度接口,当然每个taskScheduler都对应一个sparkContext。通过从DAGScheduler中获取每一个stage对应的taskSets,负责将tasks发送到集群,运行tasks,如果tasks运行失败负责retry等工作。
private[spark] trait TaskScheduler { private val appId = "spark-application-" + System.currentTimeMillis//applicationId def rootPool: Pool//决定task执行顺序的调度池 def schedulingMode: SchedulingMode def start(): Unit // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, // wait for slave registrations, etc. //Yarn用这个方法确定资源的最佳位置,等待slave注册等 def postStartHook() { } // Disconnect from the cluster. def stop(): Unit // Submit a sequence of tasks to run. //向集群提交一系列的task def submitTasks(taskSet: TaskSet): Unit // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit /** * Kills a task attempt. * * @return Whether the task was successfully killed. */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. //默认的并行度 def defaultParallelism(): Int /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ //更新运行中的任务开销,确保master知道BlockManager 仍然工作。 def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. * * @return An application ID */ def applicationId(): String = appId /** * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit /** * Get an application's attempt ID associated with the job. * * @return An application's Attempt ID */ def applicationAttemptId(): Option[String] }
三、taskScheduler的实现TaskSchedulerImpl
首先看看几个主要的属性:确定task与TaskSetManager,以及Executor的关系,通过hash表
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]//一系列taskId对应一个TaskSetManager val taskIdToExecutorId = new HashMap[Long, String]//Executor中有多个task
正在某个executor运行的tasks
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
几个与外部有关的属性:
var dagScheduler: DAGScheduler = null//upcall var backend: SchedulerBackend = null//具体的调度由backend负责 val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]//每个stage都会有个mapOutputTracker private var schedulableBuilder: SchedulableBuilder = null//调度池的调度方式,有两种:FIFO和Fair
调度模式:FIFO和Fair
schedulableBuilder 决定调度模式
def initialize(backend: SchedulerB ackend) { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf)//rootPool包含了一组Pool树,这棵树的叶子节点都是TaskSetManager case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() }
没有意外的是,TaskScheduler是在sparkContext中创建的,同时开启的有SchedulerBackend ,实质的任务调度是后者进行的。
val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler)
简单介绍了一下基本情况后,
先直接进入提交task方法中:
主要工作:
1、根据taskSet建立TaskSetManager,TaskSetManager管理独立的一组taskSet;
2、将taskSet加入调度池中;
3、调用backend.reviveOffers()
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures)//先产生TaskSetManager,每一个负责管理一组独立的taskSet val stage = taskSet.stageId//taskSet对应的stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])//// key为stageId,value为一个HashMap,这个HashMap中的key为stageAttemptId,value为TaskSetManager对象 stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie }//确保同一个Stage在正常运行情况下不能有两个taskSet在运行 if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)//将TaskSetManager加入调度池 if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers()//接下来调用SchedulerBackend的riviveOffers方法对Task进行调度,决定task具体运行在哪个Executor中 }
可以看到,提交task,首先是根据taskSet创建对应的一个taskSetManager进行管理,然后将taskSetManager加入调度池,并且根据调度池中的调度方法,将task调度到每个executor中。
看看TaskSetManager:
对于单独的一组taskSet进行调度,监控每一个task的状态,如果task失败进行重试等。
private[spark] class TaskSetManager( sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, blacklistTracker: Option[BlacklistTracker] = None, clock: Clock = new SystemClock()) extends Schedulable with Logging
CoarseGrainedSchedulerBackend——reviveOffers()
拿standalong模式中的具体实现来看就是:override def reviveOffers() { driverEndpoint.send(ReviveOffers)//使用Netty系统将ReviveOffers方法send到 }
很简单,就是在driver上endpoint点发送ReviveOffers,现在这个请求将会通过Netty的send方法发送,根据之前的总结,Netty会创建Dispatcher将消息派遣出去。
CoarseGrainedSchedulerBackend——receive
Netty发送请求信息后,CoarseGrainedSchedulerBackend的receive方法将接收该消息,根据消息同时调用makeOffers:case ReviveOffers => makeOffers()
CoarseGrainedSchedulerBackend——makeOffers
在所有的executors身上提供抽象的资源。private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive)//将死去的executor滤掉 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores)//根据executor上可用的资源获取workerOffer队列。 }.toIndexedSeq scheduler.resourceOffers(workOffers)//由taskSchedulerImpl提交资源 } if (!taskDescs.isEmpty) { launchTasks(taskDescs)//启动任务 } }
(1)、executorDataMap:key为executor的id,value则为executorData类型的executor详细信息,包括以下几点:
1、executorEndpoint:RpcEndpointRef类型,RPC终端的引用,用于数据通信;
2、executorAddress:RpcAddress类型,RPC地址,用于数据通信;
3、executorHost:String类型,executor的主机;
4、freeCores:Int类型,可用处理器cores;
5、totalCores:Int类型,处理器cores总数;
6、logUrlMap:Map[String, String]类型,日志url映射集合。
通过executorIsAlive判断executor是否存活,过滤掉已经失效或者马上要失效的executor。
(2)、利用activeExecutors中executorData的executorHost、freeCores,构造workOffers队列,也就是代表集群中的可用executor资源。
(3)、调用TaskSchedulerImpl的resourceOffers()方法,然后再调用launchTasks()方法将tasks加载到executor上去执行。
TaskSchedulerImpl——resourceOffers
此方法由TaskSchedulerImpl管理,为task根据优先级分配节点资源。通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // 标记每个slave节点为alive活跃的,并且记住它的主机名 // 同时也追踪是否有executor被加入 var newExecAvail = false for (o <- offers) { if (!hostToExecutors.contains(o.host)) {//executorId->host hostToExecutors(o.host) = new HashSet[String]() } if (!executorIdToRunningTaskIds.contains(o.executorId)) {//如果新加入的executor就将新的executor加入到hostToExecutors的hash表 hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true//表明该executor是新来的 } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => offers.filter { offer => !blacklistTracker.isNodeBlacklisted(offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers)//过滤掉黑名单中的executor val shuffledOffers = shuffleOffers(filteredOffers)//随机打乱executors,避免把任务总是放在一个executor上执行 // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))//task列表 val availableCpus = shuffledOffers.map(o => o.cores).toArray//可用的cpu资源 val sortedTaskSets = rootPool.getSortedTaskSetQueue//排好顺序task队列 for (taskSet <- sortedTaskSets) {//按调度顺序取出taskSet logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) {//如果executor是新来的,重新计算就近原则 taskSet.executorAdded()//重新计算位置 } } for (taskSet <- sortedTaskSets) {//根据调度方式决定的taskSet顺序来进行launch var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) {//对每一个taskSet,按照就近顺序分配最近的executor来执行task do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)//调用resourceOfferSingleTaskSet()方法进行单一taskSet调度 launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
TaskSchedulerImpl——resourceOfferSingleTaskSet
针对单一taskSet在executor上的资源进行分配private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false//是否有task被成功分配或者launched // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) {//遍历当前的executor val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) {//如果当前executor上的core数满足配置的单个task的core数要求 try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {//调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription tasks(i) += task//将task加入到tasks对应位置 val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet//存储task->taskSet taskIdToExecutorId(tid) = execId//存储task分配到的executorId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK//减去可用的core一个 assert(availableCpus(i) >= 0)//确保可用的cpu大于0 launchedTask = true//一套走下来,task已经launch } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return aedc launchedTask } } } return launchedTask }
launchTask()
CoarseGrainedSchedulerBackend——makeOffers最终将会调用launch task根据当前task所分配的executor信息,将该executor可用core减去配置的CPUS_PER_TASK,然后调用send方法发送一个LaunchTask消息。
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { 。。。。executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
CoarseGrainedExecutorBackend——receive
executor上的backend接收对应的消息:case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) }
executor——launchTask
在executor上如何启动一个task呢?开启一个taskRunner线程,将对应的taskID和线程记录存入hash表中,最后在线程池中执行。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
相关文章推荐
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- [spark] TaskScheduler 任务提交与调度源码解析
- Spark 任务调度之Executor执行task并返回结果
- spark源码之Job执行(1)stage划分与提交
- Spark源码分析-spark集群启动及任务执行
- Hadoop2.*源码分析之Job任务提交与执行
- 【Spark Core】任务执行机制和Task源码浅析2
- spark源码之Job执行(1)stage划分与提交
- Spark源码分析之二:Job的调度模型与运行反馈
- quartz在job间隔期间内,保证上一个任务执行完后,再去调度下一个任务(转),在多线程情况下的问题