Spark2.0.X源码深度剖析之 TaskScheduler之Task划分 —— 国内全网最新最全最具深度!!!
2017-07-13 10:16
591 查看
微信号:519292115
邮箱:taosiyuan163@163.com
尊重原创,禁止转载!!
Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等多元化操作,阅读源码有助你加深对框架的理解和认知
Task作为Spark的最小执行单元在DAGScheduler划分好Stage之后会提交给TaskSchedulerImpl的实现子类 (比如像yarn模式的YarnScheduler)来部署分配每个Task
在这个章节中,将涉及到Task的数据本地化级别,TaskSchedulerIm
4000
pl的资源调度机制,Task在Executor上的分发部署,Netty通信等..
建议看下博主的前几篇介绍Spark不同组件的文章,里面会涉及到跟之前组件的交互 ,有助于加深理解....
从上篇文章的DAGScheduler创建Task和提交Task开始:
在之前的文章中我已经介绍过DAGScheduler在划分Stage的时候会计算出每个Task的最佳位置和创建父Stage,最后通过封装所有task的TaskSet提交给TaskSchedulerImpl
这里以shuffleMapTask为例:
提交给TaskSchedulerImpl:
首先,这里补充一下,这里的TaskSchedulerImpl以及后面有关的TaskSchedulerBackend都是在SparkContext初始化的时候构建生成的:
这里我们只看createTaskScheduler,至于创建SchedulerBackend我们待会再说:
其实yarn模式的调度器实现主要还是复写了机架感知:
其实整个task的调度的通用核心逻辑都是在TaskSchedulerImpl中实现的,接着看提交Task:
这里有三个地方比较重要:
①创建TaskSetManager :一个TaskSet对应一个TaskSetManager (一个Stage对应一个TaskSet),里面的核心方法待会讲
②把创建好的TaskSetManager 加入到稍后会被调度算法排序的调度池中的schedulableQueue队列里
③调用SchedulerBackend匹配资源
schedulableBuilder根据不同的调度模式初始化也是不同的,而初始化也是在SparkContext中构建好了TaskSchedulerImpl和TaskSchedulerBackend后生成:
默认匹配到的模式是FIFO:
回到添加TaskSetManager方法:这里会把它添加到schedulableQueue,后面会对这个队列做调度算法的排序
最后开始匹配资源:
首先看下backend是哪里生成的:还是SparkContext
这里还是以Yarn集群模式为例:
Yarn集群模式复写了启动方法和获取DriverLog方法:
YarnSchedulerBackend其实就是继承于CoarseGrainedSchedulerBackend,复写了启停以及与APPMaster(Driver)交互的方法
而其他核心逻辑主要实现在CoarseGrainedSchedulerBackend,比如刚刚的backend.reviveOffers():
这里会调用DriverEnpoint的引用 触发它的receive单向消息接受方法(这里的Netty通信之前章节介绍过)
这里会做以下几点:
①过滤掉不可用的之前executor注册到Driver上的元数据
②把过滤好的每个executor元数据简单封装成WorkerOffer递交给TaskSchedulerImpl做Task划分,最后返回每个Task封装好的TaskDescription
③预备部署Task到每个executor上
首先我们看下executorDataMap为何物:它里里面维护着每个executor它对应的元数据(端口,host,可用核数等),每个executor在启动自己的时候 会同时把自己的元数据注册到Driver上
然后把拿到的每个executor元数据简单封装成WorkerOffer
然后提交给TaskSchedulerImpl做资源匹配:
这会做以下几件事:
①为每个Task,Executor,Host,Rack打标记,添加到对应的集合里面
②把非黑名单的WorkerOffer随机断乱做成待会存放TaskDescription的数组和每个executor可用核数的数组
③调度池按照对应调度模式对每个TaskSetManager划分计算
④计算出每个task的本地级别
⑤拿到了以上的元数据后封装每个Task为TaskDescription
从①阶段的时候主要是为每个host,executor和Rack做标记,而机架感知是调用的Yarn复写的机架感知:
②阶段很简单 只是把非黑名单的WorkerOffer随机断乱顺序,避免都把task分配到相同的host上,并且乱做成待会存放TaskDescription的数组和每个executor可用核数的数组
③阶段是核心的调度算法了,这里会对TaskSetManager做对应的排序:
补充:调度池会在初始化TaskSchedulerImpl的时候根据不同调度模式初始化(默认是FIFO)
进入getSortedTaskSetQueue:
首先说下schedulableQueue,这个在上面的代码里出现过,忘记的可以看看前面的代码,在调用submitTasks的时候就会把TaskSetManager加入进去,而里面存放的是全局所有的TaskSetManager
下面来看下taskSetSchedulingAlgorithm.comparator的实现:因为默认是FIFO调度算法 所以我们这里以此为例:先比较JobId,若是相同的Job就比较StageId,顺序是从小到大
④,对排序好的TaskSetManager划分本地级别算法:
上面可以看得出获取本地级别 就是看不同类型的PendingTask的Key是包含在对应的Alive里:所以这里要搞清楚的就是这2个集合到底是什么:
首先看看PendingTask:
而调用addPendingTask是在TaskSetManager初始化的时候自动调用的:
现在我们再看下Alive方法:这里以PROCESS_LOCAL级别为例:
在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet 里面会为每个task创建TaskDescription,用来后面启动task用
我们来看看里面最核心的方法resourceOffer:从TaskSetManager中提取出每个task 并封装成TaskDescription
最后回到CoarseGrainedSchedulerBackend,它会开始为每个Executor分发TaskDescription
这里Driver端会用每个executorEndpoint的引用发送启动task的单向事件消息,事件消息里封装了序列化后的TaskDescription,对应的executor会调用receive接受到并匹配,最后把TaskDescription封装成继承Java线程的Runnable 调用用线程池去run
最后交由Executor的launchTask:
TaskRunner中会复写run方法 并且最后会调用task.run:
补充:Task分为两种 一种是ShuffleMapTask 第二种是ResultTask
以ShuffleMapTask为例,他首先会从排序好的Task对应的RDD调用iterator来计算是否有持久化过 若没有就调用computer(每种RDD的实现都不一样),最后到shuffledRDD就会调用shuffleManager(默认是SortShuffleManager)来构建一个Reader,里面最终会调用BlockStoreShuffleReader.read来fetch远程各个Executor的分区,最后再会构建一个Writer把拿到的计算结果封装成bucket写入磁盘,至此一个shuffleStage结束;具体细节比较多,留着以后再起章节说
邮箱:taosiyuan163@163.com
尊重原创,禁止转载!!
Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等多元化操作,阅读源码有助你加深对框架的理解和认知
Task作为Spark的最小执行单元在DAGScheduler划分好Stage之后会提交给TaskSchedulerImpl的实现子类 (比如像yarn模式的YarnScheduler)来部署分配每个Task
在这个章节中,将涉及到Task的数据本地化级别,TaskSchedulerIm
4000
pl的资源调度机制,Task在Executor上的分发部署,Netty通信等..
建议看下博主的前几篇介绍Spark不同组件的文章,里面会涉及到跟之前组件的交互 ,有助于加深理解....
从上篇文章的DAGScheduler创建Task和提交Task开始:
在之前的文章中我已经介绍过DAGScheduler在划分Stage的时候会计算出每个Task的最佳位置和创建父Stage,最后通过封装所有task的TaskSet提交给TaskSchedulerImpl
这里以shuffleMapTask为例:
// 开始构建ShuffleMapTask对象,里面封装的主要是它的元数据和runTask方法 // 补充:Task分为两种:一种是ShuffleMapTask,一种是ResultTask new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) }
提交给TaskSchedulerImpl:
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
首先,这里补充一下,这里的TaskSchedulerImpl以及后面有关的TaskSchedulerBackend都是在SparkContext初始化的时候构建生成的:
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
// 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager // 这里以YarnClusterManager为例 val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler)
这里我们只看createTaskScheduler,至于创建SchedulerBackend我们待会再说:
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { // 创建集群模式的调度器 case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } }
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } }
其实yarn模式的调度器实现主要还是复写了机架感知:
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) } }
其实整个task的调度的通用核心逻辑都是在TaskSchedulerImpl中实现的,接着看提交Task:
这里有三个地方比较重要:
①创建TaskSetManager :一个TaskSet对应一个TaskSetManager (一个Stage对应一个TaskSet),里面的核心方法待会讲
②把创建好的TaskSetManager 加入到稍后会被调度算法排序的调度池中的schedulableQueue队列里
③调用SchedulerBackend匹配资源
override def submitTasks(taskSet: TaskSet) { // 拿到存放tasks的数组 val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { // 创建TaskSetManager,它会跟踪每个task,如果有失败的task就根据重试次数重新提交 // 还包括计算数据本地化,构建TaskDescription等 val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId // 把manager加入taskSetsByStageIdAndAttempt中,如果以前有就更新,没有就新增 val stageTaskSets = // taskSetsByStageIdAndAttempt维护的是每个stage在不同尝试ID对应的TaskSetManager taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) // 把刚创建的TaskSetManager替换掉以前的,没有就新增 stageTaskSets(taskSet.stageAttemptId) = manager // 判断case 到的是否存在 ,返回值为Boolean // 当Task被完成后isZombie会被标记成true,默认为false val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } // TaskSet有冲突情况抛错 if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } // 默认使用的是FIFO调度器(另一种是FAIR公平调度器) // 所以这里会使用FIFOSchedulableBuilder.addTaskSetManager把TaskSetManager加入到schedulableQueue // 默认后面的代码会调用FIFO调度算法对schedulableQueue使用自己的比较算法对里面的schedulableQueue排序 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { // 启动一个线程 定时检查提交的Task是否有被运行 starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { // 如果没运行说明集群的资源被占用 还没空闲出来 if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { // 如果提交的Task运行了 就关闭这个Timer线程 this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } // 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成 backend.reviveOffers() }
schedulableBuilder根据不同的调度模式初始化也是不同的,而初始化也是在SparkContext中构建好了TaskSchedulerImpl和TaskSchedulerBackend后生成:
cm.initialize(scheduler, backend)
def initialize(backend: SchedulerBackend) { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } // FIFO不会做任何事情 schedulableBuilder.buildPools() }
默认匹配到的模式是FIFO:
// default scheduler is FIFO // 默认的是FIFO先进先出的调度器 private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) val schedulingMode: SchedulingMode = try { // 拿到调度器名字,当然你可以提前配置FIAR模式 SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT)) } catch { case e: java.util.NoSuchElementException => throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf") }
回到添加TaskSetManager方法:这里会把它添加到schedulableQueue,后面会对这个队列做调度算法的排序
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { override def buildPools() { // nothing } override def addTaskSetManager(manager: Schedulable, properties: Properties) { // 里面会加入调度池的schedulableQueue中 rootPool.addSchedulable(manager) } }
override def addSchedulable(schedulable: Schedulable) { // 做个断言 require(schedulable != null) // 加入到schedulableQueue中,后面会对这个数据结构进行自定义的排序比较算法 schedulableQueue.add(schedulable) // 存放的是调度器名字和自己的标识 schedulableNameToSchedulable.put(schedulable.name, schedulable) // 标记自己的调度池 schedulable.parent = this }
最后开始匹配资源:
// 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成 backend.reviveOffers()
首先看下backend是哪里生成的:还是SparkContext
// 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager // 这里以YarnClusterManager为例 val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
这里还是以Yarn集群模式为例:
override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") }
Yarn集群模式复写了启动方法和获取DriverLog方法:
private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends YarnSchedulerBackend(scheduler, sc) {
YarnSchedulerBackend其实就是继承于CoarseGrainedSchedulerBackend,复写了启停以及与APPMaster(Driver)交互的方法
private[spark] abstract class YarnSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
而其他核心逻辑主要实现在CoarseGrainedSchedulerBackend,比如刚刚的backend.reviveOffers():
override def reviveOffers() { // 向Driver端的RpcEndpointRef发送一个异步请求,Driver端会调用receive来对应处理 // 补充:RpcEndpointRef是Driver端的Enpoint的引用 driverEndpoint.send(ReviveOffers) }
这里会调用DriverEnpoint的引用 触发它的receive单向消息接受方法(这里的Netty通信之前章节介绍过)
case ReviveOffers => makeOffers()
这里会做以下几点:
①过滤掉不可用的之前executor注册到Driver上的元数据
②把过滤好的每个executor元数据简单封装成WorkerOffer递交给TaskSchedulerImpl做Task划分,最后返回每个Task封装好的TaskDescription
③预备部署Task到每个executor上
private def makeOffers() { // Make sure no executor is killed while some task is launching on it val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing // 注意:这里的executorDataMap维护着各个ExecutorID和自己的元数据信息(地址,端口,可用cpu等) // 在executor注册自己到Drvier上的时候会把自己的ExecutorData信息put进Driver的executorDataMap val activeExecutors = executorDataMap.filterKeys(executorIsAlive) // 拿到所有的executor的可用资源 val workOffers = activeExecutors.map { case (id, executorData) => // 封装的WorkerOffer代表了executor上空闲可用的资源以及它的地址信息等 // 里面只有三个成员属性:ExecutorId,主机地址,空闲的核数 new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq // 里面会给每个task的相关信息做标记 并且返回每个task生成的TaskDescription scheduler.resourceOffers(workOffers) } // 判断返回的TaskDescription数组是否为空 if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }
首先我们看下executorDataMap为何物:它里里面维护着每个executor它对应的元数据(端口,host,可用核数等),每个executor在启动自己的时候 会同时把自己的元数据注册到Driver上
valdata = new ExecutorData(executorRef, executorRef.address, hostname, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data)
然后把拿到的每个executor元数据简单封装成WorkerOffer
private[spark] case class WorkerOffer(executorId: String, host: String, cores: Int)
然后提交给TaskSchedulerImpl做资源匹配:
这会做以下几件事:
①为每个Task,Executor,Host,Rack打标记,添加到对应的集合里面
②把非黑名单的WorkerOffer随机断乱做成待会存放TaskDescription的数组和每个executor可用核数的数组
③调度池按照对应调度模式对每个TaskSetManager划分计算
④计算出每个task的本地级别
⑤拿到了以上的元数据后封装每个Task为TaskDescription
// 集群给各个executor分配资源,按照循环的方式把task填充到每个节点 // 通俗点说就是,第一轮按顺序填充task到每个节点,若还有多余的task没有填充满又重复第一轮的动作 def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 这里会标记每个executor和host的关系,executor和正在运行的Task的关系 var newExecAvail = false for (o <- offers) { // hostToExecutors里维护着每个节点上已经激活的所有的executor if (!hostToExecutors.contains(o.host)) { // 如果不包含就把这个节点注册进hostToExecutors hostToExecutors(o.host) = new HashSet[String]() } // executorIdToRunningTaskIds里维护着每个executor中运行中的每个task if (!executorIdToRunningTaskIds.contains(o.executorId)) { // 把当前executorID添加到指定的host主机上作为标识 hostToExecutors(o.host) += o.executorId // 这里面实现的是查看failedEpoch之前有没有标记这个executor // failedEpoch中维护的是之前被发现由于各种原因导致丢失报错的executor // 如果有就把他这个标记删除掉 executorAdded(o.executorId, o.host) // 维护着每个executor对应的host executorIdToHost(o.executorId) = o.host // 之后的taskId都会加入到这里面对应的executorId中 executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true } // 拿到host对应的机架 // 如果是yarn模式,则会调用YarnScheduler.getRackForHost for (rack <- getRackForHost(o.host)) { // hostsByRack维护着每个host对应的机架,get不出来就更新进去 hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do // this here to avoid a separate thread and added synchronization overhead, and also because // updating the blacklist is only relevant when task offers are being made. blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) // 任何被拉入黑名单的节点都会在分配资源之前被移除掉 // 这里会遍历黑名单中的节点和executor val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => offers.filter { offer => !blacklistTracker.isNodeBlacklisted(offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers) // 把拿到的offers随机打乱顺序 避免把tasks分配到同一个worker上 val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. // 对每一个offer生成一个核数大小长度并且类型为TaskDescription的ArrayBuffer可变数组里 // 用来存放待会分配好的每个TaskDescription val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 返回的是每个executor中可以用的核数,最后把他们封装在一个Array数组里 val availableCpus = shuffledOffers.map(o => o.cores).toArray // 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法 // 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来 val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) // 之前标记完了executor,host,task的对应关系后会设置成true if (newExecAvail) { // 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级 taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY // 把所有TaskSet按照本地级别的顺序开始分配到各个节点 for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false // 遍历当前taskSet的所有本地级别 for (currentMaxLocality <- taskSet.myLocalityLevels) { do { // 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值 launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality // 若返回为false继续循环 } while (launchedTaskAtCurrentMaxLocality) } // 若返回为fasel 则把这个host上的executor拉入黑名单 if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } // 返回封装着TaskDescription数组 return tasks }
从①阶段的时候主要是为每个host,executor和Rack做标记,而机架感知是调用的Yarn复写的机架感知:
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) } }
②阶段很简单 只是把非黑名单的WorkerOffer随机断乱顺序,避免都把task分配到相同的host上,并且乱做成待会存放TaskDescription的数组和每个executor可用核数的数组
/** * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow * overriding in tests, so it can be deterministic. */ protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { Random.shuffle(offers) }
③阶段是核心的调度算法了,这里会对TaskSetManager做对应的排序:
// 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法 // 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来 val sortedTaskSets = rootPool.getSortedTaskSetQueue
补充:调度池会在初始化TaskSchedulerImpl的时候根据不同调度模式初始化(默认是FIFO)
// default scheduler is FIFO // 默认的是FIFO先进先出的调度器 private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) val schedulingMode: SchedulingMode = try { // 拿到调度器名字,当然你可以提前配置FIAR模式 SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT)) } catch { case e: java.util.NoSuchElementException => throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf") }
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
进入getSortedTaskSetQueue:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { // 用来存放TaskSetManager的ArrayBuffer var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] // 这里会根据不同的调度模式 对Job和stage按照对应算法进行调度 val sortedSchedulableQueue = // 之前的schedulableBuilder会把taskSetMananger加入到schedulableQueue中 // schedulableQueue结构是java.util.concurrent.ConcurrentLinkedQueue线程安全的FIFO链表 // 转换成scala的seq序列并按照匹配到的调度模式做对应的排序规则 // 简单的说就是对之前放入队列里的TaskSetManager做对应的调度模式的排序算法 schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { // 把遍历出的TaskSetManager按顺序放入 sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } // 返回排序好的TaskSetMananger队列 sortedTaskSetQueue }
首先说下schedulableQueue,这个在上面的代码里出现过,忘记的可以看看前面的代码,在调用submitTasks的时候就会把TaskSetManager加入进去,而里面存放的是全局所有的TaskSetManager
下面来看下taskSetSchedulingAlgorithm.comparator的实现:因为默认是FIFO调度算法 所以我们这里以此为例:先比较JobId,若是相同的Job就比较StageId,顺序是从小到大
/** * An interface for sort algorithm * FIFO: FIFO algorithm between TaskSetManagers * FS: FS algorithm between Pools, and FIFO or FS within Pools */ private[spark] trait SchedulingAlgorithm { def comparator(s1: Schedulable, s2: Schedulable): Boolean } private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { // 这里比较的其实是TaskSetManager的优先级,优先级是通过Taskset传进来的,而Taskset的优先级就是JobId // 之前DGAScheduler生成TaskSet的时候会把当前JobId传进去,之前章节介绍过,优先指数是顺序从小到大 val priority1 = s1.priority val priority2 = s2.priority // 底层调用的是java.lang.Integer.signum // 意思是priority1 - priority2 =负数 返回-1 ; =0 返回0 ; = 整数 返回1 var res = math.signum(priority1 - priority2) if (res == 0) { // 如果优先级相等的话 就比较StageID // 这种情况会发生在同一个job中,不同stage任务之间的调度顺序 val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } // 最后返回的Boolean值决定于res是否小于0 res < 0 } }
④,对排序好的TaskSetManager划分本地级别算法:
val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) // 之前标记完了executor,host,task的对应关系后会设置成true if (newExecAvail) { // 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级 taskSet.executorAdded() } }
def executorAdded() { recomputeLocality() } }
def recomputeLocality() { // currentLocalityIndex为当前本地级别,默认从0开始,如果不匹配或者等待资源时间(3s)超时的话级别+1 // 这里返回的是当前级别的level加上ANY级别的数组,所以这里拿到的是索引为0的元素->当前级别的level val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() // 这里拿到的对应级别的等待时间 // 补充:比如像调用的是本地级别,在尝试获取数据的时候 发现executor的核数被占满,需要等待资源释放 // 若在规定时间内 资源还是被占用 就把本地级别+1 localityWaits = myLocalityLevels.map(getLocalityWait) // 提取到当前level的索引,并更新进currentLocalityIndex currentLocalityIndex = getLocalityIndex(previousLocalityLevel) }
/** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. * */ // 计算本地级别的算法 private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { // 分为五种级别,下面条件满足的都会添加到levels中 import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} // 在初始化TaskSetManager的时候 会把需要执行的所有task加入到pendingTasksForExecutor中 val levels = new ArrayBuffer[TaskLocality.TaskLocality] // 里面维护的是executor和它里面的所有task // 需要运算的数据跟当前的task对应的分区在同一个executor中(也就是同一个JVM进程中) if (!pendingTasksForExecutor.isEmpty && // 判断可用的executor是否在pendingTasksForExecutor中 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } if (!pendingTasksForHost.isEmpty && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } if (!pendingTasksForRack.isEmpty && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } // 最后还会加入ANY级别进去 levels += ANY logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) levels.toArray }
上面可以看得出获取本地级别 就是看不同类型的PendingTask的Key是包含在对应的Alive里:所以这里要搞清楚的就是这2个集合到底是什么:
首先看看PendingTask:
/** Add a task to all the pending-task lists that it should be on. */ private def addPendingTask(index: Int) { // preferredLocations这个方法上个章节介绍过,其实就是在生成Task的时候调用taskIdToLocations // 补充下:只有之前持久化过或者之前产生过任何类型的task,preferredLocations才会返回值,不然返回值为Nil // 拿到每个task的最佳位置 for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => // pendingTasksForExecutor里面维护的是每个executor和它对应的等待运行的所有Task // 如果get不出来的话就把该索引值加入进去 pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index case e: HDFSCacheTaskLocation => // 拿到指定host的所有executor并放入set返回 val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => for (e <- set) { // 走到这说明之前有持久化到HDFS上的task,依次把task的索引值放入pendingTasksForExecutor pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } case _ => } // pendingTasksForHost 维护着每个host中所有待执行的task pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index // 拿到host对应的机架,若是yarn cluster模式就调用的 for (rack <- sched.getRackForHost(loc.host)) { // pendingTasksForRack维护着每个机架中所有的task,get不出来就更新进去 pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index } } // 若之前没有持久化过并且也没有产生过任何类型的task 那么返回就为Nil if (tasks(index).preferredLocations == Nil) { pendingTasksWithNoPrefs += index } // 这里放入的是所有的需要执行的task allPendingTasks += index // No point scanning this whole list to find the old task there }
而调用addPendingTask是在TaskSetManager初始化的时候自动调用的:
// Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. // 这里会在初始化TaskSetManager的时候调用 for (i <- (0 until numTasks).reverse) { // 这里会把TaskSet中所有的task加入到各种对应关系的数据结构中 // 比如跟executor,host,rack等的对应关系的数据结构中 addPendingTask(i) }
现在我们再看下Alive方法:这里以PROCESS_LOCAL级别为例:
defisExecutorAlive(execId: String): Boolean = synchronized { // 正常情况下在之前调用resourceOffers的时候就会把可用的executorId加入到executorIdToRunningTaskIds executorIdToRunningTaskIds.contains(execId) }
在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet 里面会为每个task创建TaskDescription,用来后面启动task用
// 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值 launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point // 这里其实就是遍历每个可以用的executor(一个WorkerOffer对应一个executor) for (i <- 0 until shuffledOffers.size) { // 拿到当前executor的ID和host地址 val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host // 如果当前的executor的可用核数大于等于CPUS_PER_TASK // CPUS_PER_TASK为运行每个Task必须的Cpu个数,默认是1 // 如果返回为true 说明这个executor可用来执行至少一个task if (availableCpus(i) >= CPUS_PER_TASK) { try { // resourceOffer主要用来对每个task做标记,最后返回每个task的TaskDescription for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // tasks用来存放所有的TaskDescription tasks(i) += task val tid = task.taskId // 做相关的标记 taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) // 把返回值标记成true launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
我们来看看里面最核心的方法resourceOffer:从TaskSetManager中提取出每个task 并封装成TaskDescription
def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { // 首先判断当前的executor或者host是否在黑名单中 val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { // 获取当前时间 val curTime = clock.getTimeMillis() // 当前最优的Task本地化级别 var allowedLocality = maxLocality // 如果级别不是NO_PREF if (maxLocality != TaskLocality.NO_PREF) { // 这里会拿到这个task其他可用的本地级别 allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks // 这里只会取最小的本地级别 allowedLocality = maxLocality } } // 遍历出指定host上executor所有需要执行的task索引和它的本地化级别 dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description // 获取task val task = tasks(index) // 调用getAndIncrement生成一个taskID val taskId = sched.newTaskId() // Do various bookkeeping // 把这个task的索引标记在copiesRunning copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size // 生成一个TaskInfo 里面注入了这个task的所有元数据 val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) // taskInfos维护的是每个task对应的元数据信息 taskInfos(taskId) = info // 加入到taskAttempts中 taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // Serialize and return the task // 序列化这个Task val serializedTask: ByteBuffer = try { ser.serialize(task) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } // 这里会判断task序列化后的大小是否达到了警告级别的阈值 if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } // 加入到调度池的runningTasks和TaskSetMananger的runningTasksSet // 就是标记成正在运行的task addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime // 格式化task的名字 val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") // 这里主要是告诉给ExecutorAllocationListener和JobProgressListener, Task启动了 // 然后对这个task的相关信息做标记 sched.dagScheduler.taskStarted(task, info) // 生成一个TaskDescription // 标记着这个task在那个host的哪个executor执行 // 以及需要添加到executor的Classpath上的所有Jar包和File new TaskDescription( taskId, attemptNum, execId, taskName, index, sched.sc.addedFiles, sched.sc.addedJars, task.localProperties, serializedTask) } } else { None } }
最后回到CoarseGrainedSchedulerBackend,它会开始为每个Executor分发TaskDescription
if (!taskDescs.isEmpty) { launchTasks(taskDescs) }
// Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { // 遍历每个executor上的所有TaskDescription for (task <- tasks.flatten) { // 首序列化每个task val serializedTask = TaskDescription.encode(task) // 判断序列化后的task大小是否超过阈值 if (serializedTask.limit >= maxRpcMessageSize) { // 如果超过阈值会对每个提取出来的TaskSetManager执行终止操作 scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { // 若满足阈值之内的大小,首先提取出executor对应的ExecutorData元数据 // executorDataMap之前介绍过,在executor注册自己的Driver上的时候就会注册 // 自己的元数据到Driver上 val executorData = executorDataMap(task.executorId) // executor上总共的cpu个数减去每个task需要的cpu个数(默认1个) // 也就是当前的task会占用一个cpu executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") // executorEndpoint为当前Task所在的executor的RPCEndpoint引用(之前章节介绍过) // 这里会给这个executor发送一个异步执行task消息 // 注意:最后真正触发task启动的是Executor而不是CoarseGrainedExecutorBackend executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
这里Driver端会用每个executorEndpoint的引用发送启动task的单向事件消息,事件消息里封装了序列化后的TaskDescription,对应的executor会调用receive接受到并匹配,最后把TaskDescription封装成继承Java线程的Runnable 调用用线程池去run
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { // 首先会反序列化传输过来的TaskDescription val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) // 开始在executor自己进程中启动task // executor会在CoarseGrainedExecutorBackend触onStart时,把自己的元数据注册到Driver上之后 // Driver发送一个send单向消息通知CoarseGrainedExecutorBackend构建自己的Executor executor.launchTask(this, taskDesc) }
最后交由Executor的launchTask:
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { // 这里会把TaskDescription和ExecutorBackend(默认是:CoarseGrainedExecutorBackend) // 封装成继承Runnable的TaskRunner,待会回交给线程池调用 val tr = new TaskRunner(context, taskDescription) // 放入负责维护所有正在此executor上运行的task的ConcurrentHashMap中 runningTasks.put(taskDescription.taskId, tr) // threadPool线程池:java.util.concurrent.ThreadPoolExecutor // 执行TaskRunner threadPool.execute(tr) }
TaskRunner中会复写run方法 并且最后会调用task.run:
// 开始运行task val res = task.run(
补充:Task分为两种 一种是ShuffleMapTask 第二种是ResultTask
以ShuffleMapTask为例,他首先会从排序好的Task对应的RDD调用iterator来计算是否有持久化过 若没有就调用computer(每种RDD的实现都不一样),最后到shuffledRDD就会调用shuffleManager(默认是SortShuffleManager)来构建一个Reader,里面最终会调用BlockStoreShuffleReader.read来fetch远程各个Executor的分区,最后再会构建一个Writer把拿到的计算结果封装成bucket写入磁盘,至此一个shuffleStage结束;具体细节比较多,留着以后再起章节说
相关文章推荐
- Spark2.0.X源码深度剖析之 RpcEnv & NettyRpcEnv
- Spark2.0.X源码深度剖析之 SparkEnv
- Spark2.0.X源码深度剖析之 SparkContext
- Spark2.0.X算子源码深度剖析之MapPartitionsRDD,绝对让你看清楚算子的计算本质
- Spark2.0.X源码深度剖析之 Spark Submit..
- [Spark源码剖析] Task的调度与执行源码剖析
- Spark内核源码深度剖析:SparkContext原理剖析与源码分析
- Spark内核源码深度剖析:Master主备切换机制原理剖析与源码分析
- Spark源码剖析——SparkContext的初始化(七)_TaskScheduler的启动
- Spark2.2 TaskScheduler原理剖析与源码分析
- Spark源码剖析(八):stage划分原理与源码剖析
- Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法
- Spark内核源码深度剖析:宽依赖与窄依赖深度剖析
- (笔记)spark内核源码深度剖析(一)
- [Spark源码剖析] DAGScheduler划分stage
- 大数据Spark “蘑菇云”行动第81课:Spark GraphX 综合案例作业讲解和源码深度剖析
- Spark 源码解析:彻底理解TaskScheduler的任务提交和task最佳位置算法
- Spark内核源码深度剖析:基于Yarn的两种提交模式深度剖析
- spark源码学习(六)--- DAGScheduler中的task的划分
- Spark内核源码深度剖析:Spark内核架构深度剖析