您的位置:首页 > 其它

Spark2.0.X源码深度剖析之 TaskScheduler之Task划分 —— 国内全网最新最全最具深度!!!

2017-07-13 10:16 591 查看
微信号:519292115

邮箱:taosiyuan163@163.com

尊重原创,禁止转载!!

Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等多元化操作,阅读源码有助你加深对框架的理解和认知

Task作为Spark的最小执行单元在DAGScheduler划分好Stage之后会提交给TaskSchedulerImpl的实现子类 (比如像yarn模式的YarnScheduler)来部署分配每个Task

在这个章节中,将涉及到Task的数据本地化级别,TaskSchedulerIm
4000
pl的资源调度机制,Task在Executor上的分发部署,Netty通信等..

建议看下博主的前几篇介绍Spark不同组件的文章,里面会涉及到跟之前组件的交互 ,有助于加深理解....

从上篇文章的DAGScheduler创建Task和提交Task开始:

在之前的文章中我已经介绍过DAGScheduler在划分Stage的时候会计算出每个Task的最佳位置和创建父Stage,最后通过封装所有task的TaskSet提交给TaskSchedulerImpl

这里以shuffleMapTask为例:

// 开始构建ShuffleMapTask对象,里面封装的主要是它的元数据和runTask方法
// 补充:Task分为两种:一种是ShuffleMapTask,一种是ResultTask
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

提交给TaskSchedulerImpl:

taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

首先,这里补充一下,这里的TaskSchedulerImpl以及后面有关的TaskSchedulerBackend都是在SparkContext初始化的时候构建生成的:

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

// 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager
// 这里以YarnClusterManager为例
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)

这里我们只看createTaskScheduler,至于创建SchedulerBackend我们待会再说:

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
// 创建集群模式的调度器
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

logInfo("Created YarnClusterScheduler")

override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

}

其实yarn模式的调度器实现主要还是复写了机架感知:

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
}

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}

其实整个task的调度的通用核心逻辑都是在TaskSchedulerImpl中实现的,接着看提交Task:

这里有三个地方比较重要:

①创建TaskSetManager :一个TaskSet对应一个TaskSetManager (一个Stage对应一个TaskSet),里面的核心方法待会讲

②把创建好的TaskSetManager 加入到稍后会被调度算法排序的调度池中的schedulableQueue队列里

③调用SchedulerBackend匹配资源

override def submitTasks(taskSet: TaskSet) {
// 拿到存放tasks的数组
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 创建TaskSetManager,它会跟踪每个task,如果有失败的task就根据重试次数重新提交
// 还包括计算数据本地化,构建TaskDescription等
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
// 把manager加入taskSetsByStageIdAndAttempt中,如果以前有就更新,没有就新增
val stageTaskSets =
// taskSetsByStageIdAndAttempt维护的是每个stage在不同尝试ID对应的TaskSetManager
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// 把刚创建的TaskSetManager替换掉以前的,没有就新增
stageTaskSets(taskSet.stageAttemptId) = manager
// 判断case 到的是否存在 ,返回值为Boolean
// 当Task被完成后isZombie会被标记成true,默认为false
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
// TaskSet有冲突情况抛错
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
// 默认使用的是FIFO调度器(另一种是FAIR公平调度器)
// 所以这里会使用FIFOSchedulableBuilder.addTaskSetManager把TaskSetManager加入到schedulableQueue
// 默认后面的代码会调用FIFO调度算法对schedulableQueue使用自己的比较算法对里面的schedulableQueue排序
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {
// 启动一个线程 定时检查提交的Task是否有被运行
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
// 如果没运行说明集群的资源被占用 还没空闲出来
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
// 如果提交的Task运行了 就关闭这个Timer线程
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成
backend.reviveOffers()
}

schedulableBuilder根据不同的调度模式初始化也是不同的,而初始化也是在SparkContext中构建好了TaskSchedulerImpl和TaskSchedulerBackend后生成:

cm.initialize(scheduler, backend)

def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
// FIFO不会做任何事情
schedulableBuilder.buildPools()
}

默认匹配到的模式是FIFO:

// default scheduler is FIFO
// 默认的是FIFO先进先出的调度器
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode =
try {
// 拿到调度器名字,当然你可以提前配置FIAR模式
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}

回到添加TaskSetManager方法:这里会把它添加到schedulableQueue,后面会对这个队列做调度算法的排序

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {

override def buildPools() {
// nothing
}

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
// 里面会加入调度池的schedulableQueue中
rootPool.addSchedulable(manager)
}
}

override def addSchedulable(schedulable: Schedulable) {
// 做个断言
require(schedulable != null)
// 加入到schedulableQueue中,后面会对这个数据结构进行自定义的排序比较算法
schedulableQueue.add(schedulable)
// 存放的是调度器名字和自己的标识
schedulableNameToSchedulable.put(schedulable.name, schedulable)
// 标记自己的调度池
schedulable.parent = this
}

最后开始匹配资源:

// 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成
backend.reviveOffers()

首先看下backend是哪里生成的:还是SparkContext

// 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager
// 这里以YarnClusterManager为例
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

这里还是以Yarn集群模式为例:

override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case  _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}

Yarn集群模式复写了启动方法和获取DriverLog方法:

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {


YarnSchedulerBackend其实就是继承于CoarseGrainedSchedulerBackend,复写了启停以及与APPMaster(Driver)交互的方法

private[spark] abstract class YarnSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

而其他核心逻辑主要实现在CoarseGrainedSchedulerBackend,比如刚刚的backend.reviveOffers():

override def reviveOffers() {
// 向Driver端的RpcEndpointRef发送一个异步请求,Driver端会调用receive来对应处理
// 补充:RpcEndpointRef是Driver端的Enpoint的引用
driverEndpoint.send(ReviveOffers)
}

这里会调用DriverEnpoint的引用 触发它的receive单向消息接受方法(这里的Netty通信之前章节介绍过)

case ReviveOffers =>
makeOffers()

这里会做以下几点:

①过滤掉不可用的之前executor注册到Driver上的元数据

②把过滤好的每个executor元数据简单封装成WorkerOffer递交给TaskSchedulerImpl做Task划分,最后返回每个Task封装好的TaskDescription

③预备部署Task到每个executor上

private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
// 注意:这里的executorDataMap维护着各个ExecutorID和自己的元数据信息(地址,端口,可用cpu等)
// 在executor注册自己到Drvier上的时候会把自己的ExecutorData信息put进Driver的executorDataMap
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 拿到所有的executor的可用资源
val workOffers = activeExecutors.map { case (id, executorData) =>
// 封装的WorkerOffer代表了executor上空闲可用的资源以及它的地址信息等
// 里面只有三个成员属性:ExecutorId,主机地址,空闲的核数
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
// 里面会给每个task的相关信息做标记 并且返回每个task生成的TaskDescription
scheduler.resourceOffers(workOffers)
}
// 判断返回的TaskDescription数组是否为空
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}

首先我们看下executorDataMap为何物:它里里面维护着每个executor它对应的元数据(端口,host,可用核数等),每个executor在启动自己的时候 会同时把自己的元数据注册到Driver上

valdata = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)

然后把拿到的每个executor元数据简单封装成WorkerOffer

private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)

然后提交给TaskSchedulerImpl做资源匹配:

这会做以下几件事:

①为每个Task,Executor,Host,Rack打标记,添加到对应的集合里面

②把非黑名单的WorkerOffer随机断乱做成待会存放TaskDescription的数组和每个executor可用核数的数组

③调度池按照对应调度模式对每个TaskSetManager划分计算

④计算出每个task的本地级别

⑤拿到了以上的元数据后封装每个Task为TaskDescription

// 集群给各个executor分配资源,按照循环的方式把task填充到每个节点
// 通俗点说就是,第一轮按顺序填充task到每个节点,若还有多余的task没有填充满又重复第一轮的动作
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 这里会标记每个executor和host的关系,executor和正在运行的Task的关系
var newExecAvail = false
  for (o <- offers) {
// hostToExecutors里维护着每个节点上已经激活的所有的executor
if (!hostToExecutors.contains(o.host)) {
// 如果不包含就把这个节点注册进hostToExecutors
hostToExecutors(o.host) = new HashSet[String]()
}
// executorIdToRunningTaskIds里维护着每个executor中运行中的每个task
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
// 把当前executorID添加到指定的host主机上作为标识
hostToExecutors(o.host) += o.executorId
// 这里面实现的是查看failedEpoch之前有没有标记这个executor
// failedEpoch中维护的是之前被发现由于各种原因导致丢失报错的executor
// 如果有就把他这个标记删除掉
executorAdded(o.executorId, o.host)
// 维护着每个executor对应的host
executorIdToHost(o.executorId) = o.host
// 之后的taskId都会加入到这里面对应的executorId中
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
// 拿到host对应的机架
// 如果是yarn模式,则会调用YarnScheduler.getRackForHost
for (rack <- getRackForHost(o.host)) {
// hostsByRack维护着每个host对应的机架,get不出来就更新进去
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

// 任何被拉入黑名单的节点都会在分配资源之前被移除掉
// 这里会遍历黑名单中的节点和executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)

// 把拿到的offers随机打乱顺序 避免把tasks分配到同一个worker上
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// 对每一个offer生成一个核数大小长度并且类型为TaskDescription的ArrayBuffer可变数组里
// 用来存放待会分配好的每个TaskDescription
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// 返回的是每个executor中可以用的核数,最后把他们封装在一个Array数组里
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
// 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
// 之前标记完了executor,host,task的对应关系后会设置成true
if (newExecAvail) {
// 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级
taskSet.executorAdded()
}
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// 把所有TaskSet按照本地级别的顺序开始分配到各个节点
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
// 遍历当前taskSet的所有本地级别
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
// 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
// 若返回为false继续循环
} while (launchedTaskAtCurrentMaxLocality)
}
// 若返回为fasel 则把这个host上的executor拉入黑名单
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}

if (tasks.size > 0) {
hasLaunchedTask = true
}
// 返回封装着TaskDescription数组
return tasks
}

从①阶段的时候主要是为每个host,executor和Rack做标记,而机架感知是调用的Yarn复写的机架感知:

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
}

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}

②阶段很简单 只是把非黑名单的WorkerOffer随机断乱顺序,避免都把task分配到相同的host上,并且乱做成待会存放TaskDescription的数组和每个executor可用核数的数组

/**
* Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
* overriding in tests, so it can be deterministic.
*/
protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
Random.shuffle(offers)
}

③阶段是核心的调度算法了,这里会对TaskSetManager做对应的排序:

// 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
// 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来
val sortedTaskSets = rootPool.getSortedTaskSetQueue

补充:调度池会在初始化TaskSchedulerImpl的时候根据不同调度模式初始化(默认是FIFO)

// default scheduler is FIFO
// 默认的是FIFO先进先出的调度器
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode =
try {
// 拿到调度器名字,当然你可以提前配置FIAR模式
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

进入getSortedTaskSetQueue:

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
// 用来存放TaskSetManager的ArrayBuffer
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
// 这里会根据不同的调度模式 对Job和stage按照对应算法进行调度
val sortedSchedulableQueue =
// 之前的schedulableBuilder会把taskSetMananger加入到schedulableQueue中
// schedulableQueue结构是java.util.concurrent.ConcurrentLinkedQueue线程安全的FIFO链表
// 转换成scala的seq序列并按照匹配到的调度模式做对应的排序规则
// 简单的说就是对之前放入队列里的TaskSetManager做对应的调度模式的排序算法
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
// 把遍历出的TaskSetManager按顺序放入
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
// 返回排序好的TaskSetMananger队列
sortedTaskSetQueue
}

首先说下schedulableQueue,这个在上面的代码里出现过,忘记的可以看看前面的代码,在调用submitTasks的时候就会把TaskSetManager加入进去,而里面存放的是全局所有的TaskSetManager

下面来看下taskSetSchedulingAlgorithm.comparator的实现:因为默认是FIFO调度算法 所以我们这里以此为例:先比较JobId,若是相同的Job就比较StageId,顺序是从小到大

/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
// 这里比较的其实是TaskSetManager的优先级,优先级是通过Taskset传进来的,而Taskset的优先级就是JobId
// 之前DGAScheduler生成TaskSet的时候会把当前JobId传进去,之前章节介绍过,优先指数是顺序从小到大
val priority1 = s1.priority
val priority2 = s2.priority
// 底层调用的是java.lang.Integer.signum
// 意思是priority1 - priority2 =负数 返回-1 ; =0 返回0 ; = 整数 返回1
var res = math.signum(priority1 - priority2)
if (res == 0) {
// 如果优先级相等的话 就比较StageID
// 这种情况会发生在同一个job中,不同stage任务之间的调度顺序
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
// 最后返回的Boolean值决定于res是否小于0
res < 0
}
}

④,对排序好的TaskSetManager划分本地级别算法:

val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
// 之前标记完了executor,host,task的对应关系后会设置成true
if (newExecAvail) {
// 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级
taskSet.executorAdded()
}
}

def executorAdded() {
recomputeLocality()
}
}

def recomputeLocality() {
// currentLocalityIndex为当前本地级别,默认从0开始,如果不匹配或者等待资源时间(3s)超时的话级别+1
// 这里返回的是当前级别的level加上ANY级别的数组,所以这里拿到的是索引为0的元素->当前级别的level
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
// 这里拿到的对应级别的等待时间
// 补充:比如像调用的是本地级别,在尝试获取数据的时候 发现executor的核数被占满,需要等待资源释放
// 若在规定时间内 资源还是被占用 就把本地级别+1
localityWaits = myLocalityLevels.map(getLocalityWait)
// 提取到当前level的索引,并更新进currentLocalityIndex
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
}

/**
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
* added to queues using addPendingTask.
*
*/
// 计算本地级别的算法
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
// 分为五种级别,下面条件满足的都会添加到levels中
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
// 在初始化TaskSetManager的时候 会把需要执行的所有task加入到pendingTasksForExecutor中
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
// 里面维护的是executor和它里面的所有task
// 需要运算的数据跟当前的task对应的分区在同一个executor中(也就是同一个JVM进程中)
if (!pendingTasksForExecutor.isEmpty &&
// 判断可用的executor是否在pendingTasksForExecutor中
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {

levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
// 最后还会加入ANY级别进去
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}

上面可以看得出获取本地级别 就是看不同类型的PendingTask的Key是包含在对应的Alive里:所以这里要搞清楚的就是这2个集合到底是什么:

首先看看PendingTask:

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
// preferredLocations这个方法上个章节介绍过,其实就是在生成Task的时候调用taskIdToLocations
// 补充下:只有之前持久化过或者之前产生过任何类型的task,preferredLocations才会返回值,不然返回值为Nil
// 拿到每个task的最佳位置
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
// pendingTasksForExecutor里面维护的是每个executor和它对应的等待运行的所有Task
// 如果get不出来的话就把该索引值加入进去
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
// 拿到指定host的所有executor并放入set返回
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
// 走到这说明之前有持久化到HDFS上的task,依次把task的索引值放入pendingTasksForExecutor
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
// pendingTasksForHost 维护着每个host中所有待执行的task
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 拿到host对应的机架,若是yarn cluster模式就调用的
for (rack <- sched.getRackForHost(loc.host)) {
// pendingTasksForRack维护着每个机架中所有的task,get不出来就更新进去
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
// 若之前没有持久化过并且也没有产生过任何类型的task 那么返回就为Nil
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
// 这里放入的是所有的需要执行的task
allPendingTasks += index  // No point scanning this whole list to find the old task there
}


而调用addPendingTask是在TaskSetManager初始化的时候自动调用的:

// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
// 这里会在初始化TaskSetManager的时候调用
for (i <- (0 until numTasks).reverse) {
// 这里会把TaskSet中所有的task加入到各种对应关系的数据结构中
// 比如跟executor,host,rack等的对应关系的数据结构中
addPendingTask(i)
}


现在我们再看下Alive方法:这里以PROCESS_LOCAL级别为例:

defisExecutorAlive(execId: String): Boolean = synchronized {
// 正常情况下在之前调用resourceOffers的时候就会把可用的executorId加入到executorIdToRunningTaskIds
executorIdToRunningTaskIds.contains(execId)
}

在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet 里面会为每个task创建TaskDescription,用来后面启动task用

// 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)

private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
// 这里其实就是遍历每个可以用的executor(一个WorkerOffer对应一个executor)
for (i <- 0 until shuffledOffers.size) {
// 拿到当前executor的ID和host地址
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
// 如果当前的executor的可用核数大于等于CPUS_PER_TASK
// CPUS_PER_TASK为运行每个Task必须的Cpu个数,默认是1
// 如果返回为true 说明这个executor可用来执行至少一个task
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
// resourceOffer主要用来对每个task做标记,最后返回每个task的TaskDescription
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
// tasks用来存放所有的TaskDescription
tasks(i) += task
val tid = task.taskId
// 做相关的标记
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
// 把返回值标记成true
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}

我们来看看里面最核心的方法resourceOffer:从TaskSetManager中提取出每个task 并封装成TaskDescription

def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
// 首先判断当前的executor或者host是否在黑名单中
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
// 获取当前时间
val curTime = clock.getTimeMillis()
// 当前最优的Task本地化级别
var allowedLocality = maxLocality
// 如果级别不是NO_PREF
if (maxLocality != TaskLocality.NO_PREF) {
// 这里会拿到这个task其他可用的本地级别
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
// 这里只会取最小的本地级别
allowedLocality = maxLocality
}
}
// 遍历出指定host上executor所有需要执行的task索引和它的本地化级别
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
// 获取task
val task = tasks(index)
// 调用getAndIncrement生成一个taskID
val taskId = sched.newTaskId()
// Do various bookkeeping
// 把这个task的索引标记在copiesRunning
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
// 生成一个TaskInfo 里面注入了这个task的所有元数据
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
// taskInfos维护的是每个task对应的元数据信息
taskInfos(taskId) = info
// 加入到taskAttempts中
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
// 序列化这个Task
val serializedTask: ByteBuffer = try {
ser.serialize(task)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
// 这里会判断task序列化后的大小是否达到了警告级别的阈值
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
// 加入到调度池的runningTasks和TaskSetMananger的runningTasksSet
// 就是标记成正在运行的task
addRunningTask(taskId)

// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
// 格式化task的名字
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
// 这里主要是告诉给ExecutorAllocationListener和JobProgressListener, Task启动了
// 然后对这个task的相关信息做标记
sched.dagScheduler.taskStarted(task, info)
// 生成一个TaskDescription
// 标记着这个task在那个host的哪个executor执行
// 以及需要添加到executor的Classpath上的所有Jar包和File
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
sched.sc.addedFiles,
sched.sc.addedJars,
task.localProperties,
serializedTask)
}
} else {
None
}
}

最后回到CoarseGrainedSchedulerBackend,它会开始为每个Executor分发TaskDescription

if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}


// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
// 遍历每个executor上的所有TaskDescription
for (task <- tasks.flatten) {
// 首序列化每个task
val serializedTask = TaskDescription.encode(task)
// 判断序列化后的task大小是否超过阈值
if (serializedTask.limit >= maxRpcMessageSize) {
// 如果超过阈值会对每个提取出来的TaskSetManager执行终止操作
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
// 若满足阈值之内的大小,首先提取出executor对应的ExecutorData元数据
// executorDataMap之前介绍过,在executor注册自己的Driver上的时候就会注册
// 自己的元数据到Driver上
val executorData = executorDataMap(task.executorId)
// executor上总共的cpu个数减去每个task需要的cpu个数(默认1个)
// 也就是当前的task会占用一个cpu
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
// executorEndpoint为当前Task所在的executor的RPCEndpoint引用(之前章节介绍过)
// 这里会给这个executor发送一个异步执行task消息

// 注意:最后真正触发task启动的是Executor而不是CoarseGrainedExecutorBackend
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}


这里Driver端会用每个executorEndpoint的引用发送启动task的单向事件消息,事件消息里封装了序列化后的TaskDescription,对应的executor会调用receive接受到并匹配,最后把TaskDescription封装成继承Java线程的Runnable 调用用线程池去run

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)

case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 首先会反序列化传输过来的TaskDescription
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
// 开始在executor自己进程中启动task
// executor会在CoarseGrainedExecutorBackend触onStart时,把自己的元数据注册到Driver上之后
// Driver发送一个send单向消息通知CoarseGrainedExecutorBackend构建自己的Executor
executor.launchTask(this, taskDesc)
}

最后交由Executor的launchTask:

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 这里会把TaskDescription和ExecutorBackend(默认是:CoarseGrainedExecutorBackend)
// 封装成继承Runnable的TaskRunner,待会回交给线程池调用
val tr = new TaskRunner(context, taskDescription)
// 放入负责维护所有正在此executor上运行的task的ConcurrentHashMap中
runningTasks.put(taskDescription.taskId, tr)
// threadPool线程池:java.util.concurrent.ThreadPoolExecutor
// 执行TaskRunner
threadPool.execute(tr)
}

TaskRunner中会复写run方法 并且最后会调用task.run:

// 开始运行task
val res = task.run(

补充:Task分为两种 一种是ShuffleMapTask  第二种是ResultTask

以ShuffleMapTask为例,他首先会从排序好的Task对应的RDD调用iterator来计算是否有持久化过 若没有就调用computer(每种RDD的实现都不一样),最后到shuffledRDD就会调用shuffleManager(默认是SortShuffleManager)来构建一个Reader,里面最终会调用BlockStoreShuffleReader.read来fetch远程各个Executor的分区,最后再会构建一个Writer把拿到的计算结果封装成bucket写入磁盘,至此一个shuffleStage结束;具体细节比较多,留着以后再起章节说
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: