Spark TaskScheduler 功能及源码解析
2015-10-09 19:09
671 查看
TaskScheduler是抽象类,目前Spark仅提供了
TaskSchedulerImpl一种实现;其初始化是在
SparkContext中
private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging
TaskScheduler实际是
SchedulerBackend的代理,本身处理一些通用逻辑,如不同Job间的调度顺序,将运行缓慢的task在空闲节点上重新提交(
speculation)等
// SparkContext调用TaskSchedulerImpl.initialize方法,传入SchedulerBackend对象 def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }
Pool用于调度
TaskManager,实际上
Pool和
TaskManager都继承了
Schedulable特征,因此
Pool可以包含
TaskManager或其他
Pool;
Spark默认使用
FIFO(First In,First Out)调度模式,另外还有
FAIR模式。
FIFO模式只有一个
pool,
FAIR模式有多个
pool。
Pool也分
FIFO和
FAIR两种模式,两种模式分别对应于
FairSchedulableBuilder和
FIFOSchedulableBuilder
SparkContext根据
master参数决定采用何种
SchedulerBackend,以
Spark Standalone模式为例,使用的是
SparkDeploySchedulerBackend,继承
CoarseGrainedSchedulerBackend父类
private[spark] class SparkDeploySchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with AppClientListener with Logging { // SparkContext调用TaskSchedulerImpl.start方法 override def start() { backend.start() // 判断speculation是否开启,如是,则启动线程将运行缓慢的任务在空闲的资源上重新提交 if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, SPECULATION_INTERVAL_MS milliseconds) { Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } }(sc.env.actorSystem.dispatcher) } }
SparkDeploySchedulerBackend.start方法中初始化了
AppClient对象,主要用于
Driver和
Master的
Akka通信交互信息、注册
Spark Application等
// SparkDeploySchedulerBackend.start() override def start() { super.start() ... val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts // 将CoarseGrainedExecutorBackend封装入Command val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) // 初始化AppClient对象 client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() }
接下来进入正题:Task的调度运行
在上一篇文章《Spark DAGScheduler 功能及源码解析》的最后,
DAGScheduler调用
TaskSchedulerImpl.submitTasks方法提交
TaskSet运行
// TaskSchedulerImpl.submitTasks override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") // TaskSetManager不是线程安全的类,因此对其操作的时候需要保证synchronized this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
SparkDeploySchedulerBackend.reviveOffers调用的是父类的
CoarseGrainedSchedulerBackend.reviveOffers方法,再经过消息传递,调用
CoarseGrainedSchedulerBackend.makeOffers,再到
CoarseGrainedSchedulerBackend.launchTasks,参数是
TaskSchedulerImpl.resourceOffers方法所返回的
CoarseGrainedSchedulerBackend对于executor是粗粒度的管理,指的是在Job的整个生命周期中都会持有executor资源,而不是task结束就释放executor,当新的task到来时再重新申请。粗粒度的好处是对于资源的计算相对简单,缺点是会存在executor资源的浪费。相对的细粒度管理就存在executor的重用和抢占,以提高利用率,目前仅有
Mesos提供了细粒度管理。
当有资源更新时,比如新的executor加入(增加总core),已有的executor被移除(减少总core),executor当前任务完成(回收core),
TaskScheduler会通知
CoarseGrainedSchedulerBackend,后者就通过
makeOffers方法调用
TaskScheduler.resourceOffers方法,对等待队列中的任务进行一次分配(其中系统资源以
WorkerOffer的形式展现)
// CoarseGrainedSchedulerBackend.makeOffers def makeOffers(executorId: String) { val executorData = executorDataMap(executorId) launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)))) }
// TaskSchedulerImpl.resourceOffers方法被cluster manager调用,传递的参数offers表示worker提供的资源,该方法根据资源情况,结合待执行任务的优先级,将任务平衡的分配给executors def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // 激活所有slave节点,记录其hostname,并检查是否有新的executor加入 var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // 将workers的顺序随机洗牌,以避免总是前几个worker被分配到任务 val shuffledOffers = Random.shuffle(offers) // 构建task序列,以分配到worker val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray // 按优先级排序的TaskSetManager序列,任务优先级是由Pool的调度模式(FIFO/FAIR)决定的 val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // 按照调度优先级顺序遍历TaskSet,在所有系统资源(WorkerOffer)上从最高Locality到最低Locality依次尝试执行最适合的task var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks } // TaskSchedulerImpl.resourceOfferSingleTaskSet private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host // 判断executor是否有足够的CPU核数来运行task if (availableCpus(i) >= CPUS_PER_TASK) { try { // 真正调用的是TaskSetManager.resourceOffer方法 for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
TaskSetManager.resourceOffer方法的作用是为executor资源提供一个最符合数据本地性的任务,其中涉及到
Locality相关的逻辑
TaskLocality是枚举类,表示数据本地化的级别,其优先级为
PROCESS_LOCAL(最高) < NODE_LOCAL < NO_PREF < RACK_LOCAL < ANY(最低);其中
PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL可分别设置对应的延迟时间,默认值是3s
TaskSetManager内部维护了以下几个
HashMap
pendingTasksForExecutor
pendingTasksForHost
pendingTasksForRack
pendingTasksWithNoPrefs
TaskSetManager在初始化时,若Task的
preferredLocations不为空,则将Task添加到前三个pending队列;若为空,则加入pendingTasksWithNoPrefs
// TaskSetManager.resourceOffer def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { if (!isZombie) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality if (maxLocality != TaskLocality.NO_PREF) { // 结合各Locality设置的延迟时间及上次成功在当前Locality级别提交任务的时间,获得能够允许的最高本地化级别的Locality级别 allowedLocality = getAllowedLocalityLevel(curTime) // 大于表示本地化级别更低 if (allowedLocality > maxLocality) { // allowedLocality = maxLocality } } // dequeueTask返回的是允许的Locality范围内Locality级别最高的Task的TaskDescription dequeueTask(execId, host, allowedLocality) match { case Some((index, taskLocality, speculative)) => { val task = tasks(index) val taskId = sched.newTaskId() // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // 除非Task的Locality级别为NO_PREF,否则更新当前Locality级别为该task的Locality,并更新lastLaunchTime if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // 序列化task val startTime = clock.getTimeMillis() val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // 序列化出错没有重试的必要 case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } // 若task过大,则存在优化的必要 if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format( taskName, taskId, host, taskLocality, serializedTask.limit)) // 通知DAGScheduler任务开始执行 sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) } case _ => } } None }
TaskSetManager.getAllowedLocalityLevel结合各
Locality设置的延迟时间及上次成功在当前
Locality级别提交任务的时间,获得能够允许的最高本地化级别的
Locality级别
// TaskSetManager.getAllowedLocalityLevel private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = { // 移除已被调度或完成的task,采用的是lazy方式 def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = { var indexOffset = pendingTaskIds.size while (indexOffset > 0) { indexOffset -= 1 val index = pendingTaskIds(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { return true } else { pendingTaskIds.remove(indexOffset) } } false } // 遍历pendingTasks,移除已被调度的task,若仍有task待调度,返回true def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = { val emptyKeys = new ArrayBuffer[String] val hasTasks = pendingTasks.exists { case (id: String, tasks: ArrayBuffer[Int]) => if (tasksNeedToBeScheduledFrom(tasks)) { true } else { emptyKeys += id false } } // The key could be executorId, host or rackId emptyKeys.foreach(id => pendingTasks.remove(id)) hasTasks } // currentLocalityIndex记录了当前运行在哪个TaskLocality while (currentLocalityIndex < myLocalityLevels.length - 1) { val moreTasks = myLocalityLevels(currentLocalityIndex) match { case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) } if (!moreTasks) { // 若当前Locality没有需要执行的task,则进入更低一级Locality,并更新lastLaunchTime lastLaunchTime = curTime logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1 } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { // 若距离上次成功在此Locality级别提交任务的时间间隔超过了该Locality级别设定的延迟时间,则进入更低一级Locality,并更新lastLaunchTime lastLaunchTime += localityWaits(currentLocalityIndex) currentLocalityIndex += 1 logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms") } else { return myLocalityLevels(currentLocalityIndex) } } myLocalityLevels(currentLocalityIndex) }
取得最适合运行的Task后,调用
ScheduledBackend.launchTasks方法运行Task
// CoarseGrainedSchedulerBackend.launchTasks def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) // 当task序列化的大小超过AkkaFrameSize的限制时,撤销TaskSet,并抛出提示信息 if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSet.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
通过消息传递,被调用的是
Executor.launchTask
// Executor.launchTask def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner是真正执行任务的类,负责反序列化
Task,
RDD,运行Task并统计运行的时间;它也定义在
Executor.scala文件里
相关文章推荐
- [php] try - catch exceptiong handler
- JavaGUI实现点名系统
- 【制作镜像】使用隧道上传文件
- bitcode
- LintCode O(1)检测2的幂次
- UVALive 7077 - Little Zu Chongzhi's Triangles(暴力)
- Java基础知识强化之集合框架笔记50:Map集合之Map集合的概述和特点
- 将动态表绑定到前台页面
- ThreadSafeClientConnManager用来支持多线程的使用http client
- 继承和多态
- MIRACL大数运算库使用手册(2014-4-15 10:52)
- JS页面间传值
- 单例模式的七种写法【java】
- android入门
- ThreadSafeClientConnManager用来支持多线程的使用http client
- 简单学C——第三天
- Alerta在CentOS6.6安装全过程
- ios深度解析之coreData
- Android控件之WebView(网页调转回APP)
- POJ 1631 && HDU 1950 Bridging signals(LIS)