您的位置:首页 > 其它

Spark TaskScheduler 功能及源码解析

2015-10-09 19:09 671 查看
TaskScheduler
是抽象类,目前Spark仅提供了
TaskSchedulerImpl
一种实现;其初始化是在
SparkContext


private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging


TaskScheduler
实际是
SchedulerBackend
的代理,本身处理一些通用逻辑,如不同Job间的调度顺序,将运行缓慢的task在空闲节点上重新提交(
speculation
)等

// SparkContext调用TaskSchedulerImpl.initialize方法,传入SchedulerBackend对象
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}


Pool
用于调度
TaskManager
,实际上
Pool
TaskManager
都继承了
Schedulable
特征,因此
Pool
可以包含
TaskManager
或其他
Pool


Spark
默认使用
FIFO(First In,First Out)
调度模式,另外还有
FAIR
模式。
FIFO
模式只有一个
pool
FAIR
模式有多个
pool
Pool
也分
FIFO
FAIR
两种模式,两种模式分别对应于
FairSchedulableBuilder
FIFOSchedulableBuilder


SparkContext
根据
master
参数决定采用何种
SchedulerBackend
,以
Spark Standalone
模式为例,使用的是
SparkDeploySchedulerBackend
,继承
CoarseGrainedSchedulerBackend
父类

private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with AppClientListener
with Logging {

// SparkContext调用TaskSchedulerImpl.start方法
override def start() {
backend.start()

// 判断speculation是否开启,如是,则启动线程将运行缓慢的任务在空闲的资源上重新提交
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}(sc.env.actorSystem.dispatcher)
}
}


SparkDeploySchedulerBackend.start
方法中初始化了
AppClient
对象,主要用于
Driver
Master
Akka
通信交互信息、注册
Spark Application


// SparkDeploySchedulerBackend.start()
override def start() {
super.start()
...
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts

// 将CoarseGrainedExecutorBackend封装入Command
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)

// 初始化AppClient对象
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
}


接下来进入正题:Task的调度运行

在上一篇文章《Spark DAGScheduler 功能及源码解析》的最后,
DAGScheduler
调用
TaskSchedulerImpl.submitTasks
方法提交
TaskSet
运行

// TaskSchedulerImpl.submitTasks
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// TaskSetManager不是线程安全的类,因此对其操作的时候需要保证synchronized
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}


SparkDeploySchedulerBackend.reviveOffers
调用的是父类的
CoarseGrainedSchedulerBackend.reviveOffers
方法,再经过消息传递,调用
CoarseGrainedSchedulerBackend.makeOffers
,再到
CoarseGrainedSchedulerBackend.launchTasks
,参数是
TaskSchedulerImpl.resourceOffers
方法所返回的

CoarseGrainedSchedulerBackend
对于executor是粗粒度的管理,指的是在Job的整个生命周期中都会持有executor资源,而不是task结束就释放executor,当新的task到来时再重新申请。粗粒度的好处是对于资源的计算相对简单,缺点是会存在executor资源的浪费。相对的细粒度管理就存在executor的重用和抢占,以提高利用率,目前仅有
Mesos
提供了细粒度管理。

当有资源更新时,比如新的executor加入(增加总core),已有的executor被移除(减少总core),executor当前任务完成(回收core),
TaskScheduler
会通知
CoarseGrainedSchedulerBackend
,后者就通过
makeOffers
方法调用
TaskScheduler.resourceOffers
方法,对等待队列中的任务进行一次分配(其中系统资源以
WorkerOffer
的形式展现)

// CoarseGrainedSchedulerBackend.makeOffers
def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}


// TaskSchedulerImpl.resourceOffers方法被cluster manager调用,传递的参数offers表示worker提供的资源,该方法根据资源情况,结合待执行任务的优先级,将任务平衡的分配给executors
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 激活所有slave节点,记录其hostname,并检查是否有新的executor加入
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}

// 将workers的顺序随机洗牌,以避免总是前几个worker被分配到任务
val shuffledOffers = Random.shuffle(offers)
// 构建task序列,以分配到worker
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 按优先级排序的TaskSetManager序列,任务优先级是由Pool的调度模式(FIFO/FAIR)决定的
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}

// 按照调度优先级顺序遍历TaskSet,在所有系统资源(WorkerOffer)上从最高Locality到最低Locality依次尝试执行最适合的task
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}

if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}

// TaskSchedulerImpl.resourceOfferSingleTaskSet
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
// 判断executor是否有足够的CPU核数来运行task
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
// 真正调用的是TaskSetManager.resourceOffer方法
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}


TaskSetManager.resourceOffer
方法的作用是为executor资源提供一个最符合数据本地性的任务,其中涉及到
Locality
相关的逻辑

TaskLocality
是枚举类,表示数据本地化的级别,其优先级为

PROCESS_LOCAL(最高) < NODE_LOCAL < NO_PREF < RACK_LOCAL < ANY(最低)
;其中
PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL
可分别设置对应的延迟时间,默认值是3s

TaskSetManager
内部维护了以下几个
HashMap


pendingTasksForExecutor

pendingTasksForHost

pendingTasksForRack

pendingTasksWithNoPrefs

TaskSetManager
在初始化时,若Task的
preferredLocations
不为空,则将Task添加到前三个pending队列;若为空,则加入pendingTasksWithNoPrefs

// TaskSetManager.resourceOffer
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()

var allowedLocality = maxLocality

if (maxLocality != TaskLocality.NO_PREF) {
// 结合各Locality设置的延迟时间及上次成功在当前Locality级别提交任务的时间,获得能够允许的最高本地化级别的Locality级别
allowedLocality = getAllowedLocalityLevel(curTime)
// 大于表示本地化级别更低
if (allowedLocality > maxLocality) {
//
allowedLocality = maxLocality
}
}

// dequeueTask返回的是允许的Locality范围内Locality级别最高的Task的TaskDescription
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// 除非Task的Locality级别为NO_PREF,否则更新当前Locality级别为该task的Locality,并更新lastLaunchTime
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// 序列化task
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// 序列化出错没有重试的必要
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
// 若task过大,则存在优化的必要
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)

// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
taskName, taskId, host, taskLocality, serializedTask.limit))

// 通知DAGScheduler任务开始执行
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}


TaskSetManager.getAllowedLocalityLevel
结合各
Locality
设置的延迟时间及上次成功在当前
Locality
级别提交任务的时间,获得能够允许的最高本地化级别的
Locality
级别

// TaskSetManager.getAllowedLocalityLevel
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// 移除已被调度或完成的task,采用的是lazy方式
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// 遍历pendingTasks,移除已被调度的task,若仍有task待调度,返回true
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}

// currentLocalityIndex记录了当前运行在哪个TaskLocality
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// 若当前Locality没有需要执行的task,则进入更低一级Locality,并更新lastLaunchTime
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// 若距离上次成功在此Locality级别提交任务的时间间隔超过了该Locality级别设定的延迟时间,则进入更低一级Locality,并更新lastLaunchTime
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}


取得最适合运行的Task后,调用
ScheduledBackend.launchTasks
方法运行Task

// CoarseGrainedSchedulerBackend.launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
// 当task序列化的大小超过AkkaFrameSize的限制时,撤销TaskSet,并抛出提示信息
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}


通过消息传递,被调用的是
Executor.launchTask


// Executor.launchTask
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}


TaskRunner
是真正执行任务的类,负责反序列化
Task
RDD
,运行Task并统计运行的时间;它也定义在
Executor.scala
文件里
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: