spark 2.1 TaskSetManager, Schedulable And Pool
2017-05-19 11:01
169 查看
Schedulable
/** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { var parent: Pool // child queues def schedulableQueue: ConcurrentLinkedQueue[Schedulable] def schedulingMode: SchedulingMode def weight: Int def minShare: Int def runningTasks: Int def priority: Int def stageId: Int def name: String def addSchedulable(schedulable: Schedulable): Unit def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] }
SchedulingMode
/** * "FAIR" and "FIFO" determines which policy is used * to order tasks amongst a Schedulable's sub-queues * "NONE" is used when the a Schedulable has no sub-queues. */ object SchedulingMode extends Enumeration { type SchedulingMode = Value val FAIR, FIFO, NONE = Value }
Pool
/** * A Schedulable entity that represents collection of Pools or TaskSetManagers */ private[spark] class Pool( val poolName: String, val schedulingMode: SchedulingMode, initMinShare: Int, initWeight: Int) extends Schedulable with Logging {
fields
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare var runningTasks = 0 var priority = 0 // A pool's stage id is used to break the tie in scheduling. var stageId = -1 var name = poolName var parent: Pool = null
taskSetSchedulingAlgorithm
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { case SchedulingMode.FAIR => new FairSchedulingAlgorithm() case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() case _ => val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead." throw new IllegalArgumentException(msg) } }
addSchedulable
override def addSchedulable(schedulable: Schedulable) { require(schedulable != null) schedulableQueue.add(schedulable) schedulableNameToSchedulable.put(schedulable.name, schedulable) schedulable.parent = this }
removeSchedulable
override def removeSchedulable(schedulable: Schedulable) { schedulableQueue.remove(schedulable) schedulableNameToSchedulable.remove(schedulable.name) }
getSchedulableByName
override def getSchedulableByName(schedulableName: String): Schedulable = { if (schedulableNameToSchedulable.containsKey(schedulableName)) { return schedulableNameToSchedulable.get(schedulableName) } for (schedulable <- schedulableQueue.asScala) { val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched } } null }
#
override def executorLost(executorId: String, host: String, reason: ExecutorLossReason) { schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason)) }
checkSpeculatableTasks
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation) } shouldRevive }
getSortedTaskSetQueue
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } sortedTaskSetQueue }
increaseRunningTasks
def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } }
decreaseRunningTasks
def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) } } }
TaskSetManager
/** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * * @param sched the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails this number of times, the entire * task set will be aborted */ private[spark] class TaskSetManager( sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, clock: Clock = new SystemClock()) extends Schedulable with Logging {
fields
private val conf = sched.sc.conf // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) // Limit of bytes for total size of results (default is 1GB) val maxResultSize = Utils.getMaxResultSize(conf) // Serializer for closures and tasks. val env = SparkEnv.get val ser = env.closureSerializer.newInstance() val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksSuccessful = 0 var weight = 1 var minShare = 0 var priority = taskSet.priority var stageId = taskSet.stageId val name = "TaskSet_" + taskSet.id var parent: Pool = null var totalResultSize = 0L var calculatedTasks = 0 val runningTasksSet = new HashSet[Long] override def runningTasks: Int = runningTasksSet.size // True once no more tasks should be launched for this task set manager. TaskSetManagers enter // the zombie state once at least one attempt of each task has completed successfully, or if the // task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie // state in order to continue to track and account for the running tasks. // TODO: We should kill any running task attempts when the task set manager becomes a zombie. var isZombie = false // Set of pending tasks for each executor. These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put // back at the head of the stack. These collections may contain duplicates // for two reasons: // (1): Tasks are only removed lazily; when a task is launched, it remains // in all the pending lists except the one that it was launched from. // (2): Tasks may be re-added to these lists multiple times as a result // of failures. // Duplicates are handled in dequeueTaskFromList, which ensures that a // task hasn't already started running before launching it. private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each host. Similar to pendingTasksForExecutor, // but at host level. private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each rack -- similar to the above. private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] // Set containing pending tasks with no locality preferences. var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above). val allPendingTasks = new ArrayBuffer[Int] // Tasks that can be speculated. Since these will be a small fraction of total // tasks, we'll just hold them in a HashSet. val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = conf.getLong("spark.logging.exceptionPrintInterval", 10000) // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception // was printed. This should ideally be an LRU map that can drop old exceptions automatically. val recentExceptions = HashMap[String, (Int, Long)]() // Figure out the current map output tracker epoch and set it on all tasks val epoch = sched.mapOutputTracker.getEpoch logDebug("Epoch for " + taskSet + ": " + epoch) for (t <- tasks) { t.epoch = epoch } // Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. for (i <- (0 until numTasks).reverse) { addPendingTask(i) } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling var myLocalityLevels = computeValidLocalityLevels() var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level // Delay scheduling variables: we keep track of our current locality level and the time we // last launched a task at that level, and move up a level when localityWaits[curLevel] expires. // We then move down if we manage to launch a "more local" task. var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE var emittedTaskSizeWarning = false
addPendingTask
/** Add a task to all the pending-task lists that it should be on. */ private def addPendingTask(index: Int) { for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index case e: HDFSCacheTaskLocation => val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => for (e <- set) { pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } case _ => } pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index for (rack <- sched.getRackForHost(loc.host)) { pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index } } if (tasks(index).preferredLocations == Nil) { pendingTasksWithNoPrefs += index } allPendingTasks += index // No point scanning this whole list to find the old task there }
getPendingTasksForExecutor
/** * Return the pending tasks list for a given executor ID, or an empty list if * there is no map entry for that host */ private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = { pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer()) }
getPendingTasksForHost
/** * Return the pending tasks list for a given host, or an empty list if * there is no map entry for that host */ private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { pendingTasksForHost.getOrElse(host, ArrayBuffer()) }
getPendingTasksForRack
/** * Return the pending rack-local task list for a given rack, or an empty list if * there is no map entry for that rack */ private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = { pendingTasksForRack.getOrElse(rack, ArrayBuffer()) }
dequeueTaskFromList
/** * Dequeue a pending task from the given list and return its index. * Return None if the list is empty. * This method also cleans up any tasks in the list that have already * been launched, since we want that to happen lazily. */ private def dequeueTaskFromList( execId: String, host: String, list: ArrayBuffer[Int]): Option[Int] = { var indexOffset = list.size while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } } None }
#
/** Check whether a task is currently running an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { taskAttempts(taskIndex).exists(_.host == host) } private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = { taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTask(host, index) || blacklist.isExecutorBlacklistedForTask(execId, index) } } /** * Return a speculative task for a given executor if any are available. The task should not have * an attempt running on this host, in case the host is slow. In addition, the task should meet * the given locality constraint. */ // Labeled as protected to allow tests to override providing speculative tasks if necessary protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set def canRunOnHost(index: Int): Boolean = { !hasAttemptOnHost(index, host) && !isTaskBlacklistedOnExecOrNode(index, execId, host) } if (!speculatableTasks.isEmpty) { // Check for process-local tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming for (index <- speculatableTasks if canRunOnHost(index)) { val prefs = tasks(index).preferredLocations val executors = prefs.flatMap(_ match { case e: ExecutorCacheTaskLocation => Some(e.executorId) case _ => None }); if (executors.contains(execId)) { speculatableTasks -= index return Some((index, TaskLocality.PROCESS_LOCAL)) } } // Check for node-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- speculatableTasks if canRunOnHost(index)) { val locations = tasks(index).preferredLocations.map(_.host) if (locations.contains(host)) { speculatableTasks -= index return Some((index, TaskLocality.NODE_LOCAL)) } } } // Check for no-preference tasks if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { for (index <- speculatableTasks if canRunOnHost(index)) { val locations = tasks(index).preferredLocations if (locations.size == 0) { speculatableTasks -= index return Some((index, TaskLocality.PROCESS_LOCAL)) } } } // Check for rack-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for (rack <- sched.getRackForHost(host)) { for (index <- speculatableTasks if canRunOnHost(index)) { val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost) if (racks.contains(rack)) { speculatableTasks -= index return Some((index, TaskLocality.RACK_LOCAL)) } } } } // Check for non-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- speculatableTasks if canRunOnHost(index)) { speculatableTasks -= index return Some((index, TaskLocality.ANY)) } } } None } /** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. * * @return An option containing (task index within the task set, locality, is speculative?) */ private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) { return Some((index, TaskLocality.ANY, false)) } } // find a speculative task if all others tasks have been scheduled dequeueSpeculativeTask(execId, host, maxLocality).map { case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} } /** * Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */ @throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality if (maxLocality != TaskLocality.NO_PREF) { allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks allowedLocality = maxLocality } } dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // Serialize and return the task val startTime = clock.getTimeMillis() val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask) } } else { None } } private def maybeFinishTaskSet() { if (isZombie && runningTasks == 0) { sched.taskSetFinished(this) } } /** * Get the level we can launch tasks according to delay scheduling, based on current wait time. */ private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = { // Remove the scheduled or finished tasks lazily def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = { var indexOffset = pendingTaskIds.size while (indexOffset > 0) { indexOffset -= 1 val index = pendingTaskIds(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { return true } else { pendingTaskIds.remove(indexOffset) } } false } // Walk through the list of tasks that can be scheduled at each location and returns true // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have // already been scheduled. def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = { val emptyKeys = new ArrayBuffer[String] val hasTasks = pendingTasks.exists { case (id: String, tasks: ArrayBuffer[Int]) => if (tasksNeedToBeScheduledFrom(tasks)) { true } else { emptyKeys += id false } } // The key could be executorId, host or rackId emptyKeys.foreach(id => pendingTasks.remove(id)) hasTasks } while (currentLocalityIndex < myLocalityLevels.length - 1) { val moreTasks = myLocalityLevels(currentLocalityIndex) match { case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) } if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). lastLaunchTime = curTime logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1 } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { // Jump to the next locality level, and reset lastLaunchTime so that the next locality // wait timer doesn't immediately expire lastLaunchTime += localityWaits(currentLocalityIndex) logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms") currentLocalityIndex += 1 } else { return myLocalityLevels(currentLocalityIndex) } } myLocalityLevels(currentLocalityIndex) } /** * Find the index in myLocalityLevels for a given locality. This is also designed to work with * localities that are not in myLocalityLevels (in case we somehow get those) by returning the * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY. */ def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = { var index = 0 while (locality > myLocalityLevels(index)) { index += 1 } index } /** * Check whether the given task set has been blacklisted to the point that it can't run anywhere. * * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job * will hang. * * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that * would add extra time to each iteration of the scheduling loop. Here, we take the approach of * making sure at least one of the unscheduled tasks is schedulable. This means we may not detect * the hang as quickly as we could have, but we'll always detect the hang eventually, and the * method is faster in the typical case. In the worst case, this method can take * O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task * failures (this is because the method picks one unscheduled task, and then iterates through each * executor until it finds one that the task isn't blacklisted on). */ private[scheduler] def abortIfCompletelyBlacklisted( hostToExecutors: HashMap[String, HashSet[String]]): Unit = { taskSetBlacklistHelperOpt.foreach { taskSetBlacklist => // Only look for unschedulable tasks when at least one executor has registered. Otherwise, // task sets will be (unnecessarily) aborted in cases when no executors have registered yet. if (hostToExecutors.nonEmpty) { // find any task that needs to be scheduled val pendingTask: Option[Int] = { // usually this will just take the last pending task, but because of the lazy removal // from each list, we may need to go deeper in the list. We poll from the end because // failed tasks are put back at the end of allPendingTasks, so we're more likely to find // an unschedulable task this way. val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) } if (indexOffset == -1) { None } else { Some(allPendingTasks(indexOffset)) } } pendingTask.foreach { indexInTaskSet => // try to find some executor this task can run on. Its possible that some *other* // task isn't schedulable anywhere, but we will discover that in some later call, // when that unschedulable task is the last task remaining. val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) => // Check if the task can run on the node val nodeBlacklisted = taskSetBlacklist.isNodeBlacklistedForTaskSet(host) || taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet) if (nodeBlacklisted) { true } else { // Check if the task can run on any of the executors execsOnHost.forall { exec => taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) || taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet) } } } if (blacklistedEverywhere) { val partition = tasks(indexInTaskSet).partitionId abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " + s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " + s"can be configured via spark.blacklist.*.") } } } } } /** * Marks the task as getting result and notifies the DAG Scheduler */ def handleTaskGettingResult(tid: Long): Unit = { val info = taskInfos(tid) info.markGettingResult() sched.dagScheduler.taskGettingResult(info) } /** * Check whether has enough quota to fetch the result with `size` bytes */ def canFetchMoreResults(size: Long): Boolean = sched.synchronized { totalResultSize += size calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + s"(${Utils.bytesToString(maxResultSize)})" logError(msg) abort(msg) false } else { true } } /** * Marks a task as successful and notifies the DAGScheduler that the task has ended. */ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markFinished(TaskState.FINISHED) removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) // Kill any other attempts for the same task (since those are unnecessary now that one // attempt completed successfully). for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) } if (!successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + s" ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } maybeFinishTaskSet() } /** * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the * DAG Scheduler. */ def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) { val info = taskInfos(tid) if (info.failed || info.killed) { return } removeRunningTask(tid) info.markFinished(state) val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," + s" executor ${info.executorId}): ${reason.toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) if (!successful(index)) { successful(index) = true tasksSuccessful += 1 } isZombie = true None case ef: ExceptionFailure => // ExceptionFailure's might have accumulator updates accumUpdates = ef.accums if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying" .format(info.id, taskSet.id, tid, ef.description)) abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format( info.id, taskSet.id, tid, ef.description)) return } val key = ef.description val now = clock.getTimeMillis() val (printFull, dupCount) = { if (recentExceptions.contains(key)) { val (dupCount, printTime) = recentExceptions(key) if (now - printTime > EXCEPTION_PRINT_INTERVAL) { recentExceptions(key) = (0, now) (true, 0) } else { recentExceptions(key) = (dupCount + 1, printTime) (false, dupCount + 1) } } else { recentExceptions(key) = (0, now) (true, 0) } } if (printFull) { logWarning(failureReason) } else { logInfo( s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on ${info.host}, executor" + s" ${info.executorId}: ${ef.className} (${ef.description}) [duplicate $dupCount]") } ef.exception case e: ExecutorLostFailure if !e.exitCausedByApp => logInfo(s"Task $tid failed because while it was being computed, its executor " + "exited for a reason unrelated to the task. Not counting this failure towards the " + "maximum number of failures for the task.") None case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others logWarning(failureReason) None } sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } else { addPendingTask(index) } if (!isZombie && reason.countTowardsTaskFailures) { taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask( info.host, info.executorId, index)) assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { logError("Task %d in stage %s failed %d times; aborting job".format( index, taskSet.id, maxTaskFailures)) abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) return } } maybeFinishTaskSet() } def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized { // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.dagScheduler.taskSetFailed(taskSet, message, exception) isZombie = true maybeFinishTaskSet() } /** If the given task ID is not in the set of running tasks, adds it. * * Used to keep track of the number of running tasks, for enforcing scheduling policies. */ def addRunningTask(tid: Long) { if (runningTasksSet.add(tid) && parent != null) { parent.increaseRunningTasks(1) } } /** If the given task ID is in the set of running tasks, removes it. */ def removeRunningTask(tid: Long) { if (runningTasksSet.remove(tid) && parent != null) { parent.decreaseRunningTasks(1) } } override def getSchedulableByName(name: String): Schedulable = { null } override def addSchedulable(schedulable: Schedulable) {} override def removeSchedulable(schedulable: Schedulable) {} override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() sortedTaskSetQueue += this sortedTaskSetQueue } /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */ override def executorLost(execId: String, host: String, reason: ExecutorLossReason) { // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (successful(index)) { successful(index) = false copiesRunning(index) -= 1 tasksSuccessful -= 1 addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. sched.dagScheduler.taskEnded( tasks(index), Resubmitted, null, Seq.empty, info) } } } for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case exited: ExecutorExited => exited.exitCausedByApp case ExecutorKilled => false case _ => true } handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp, Some(reason.toString))) } // recalculate valid locality levels and waits when executor is lost recomputeLocality() } /** * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that * we don't scan the whole task set. It might also help to make this sorted by launch time. */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. if (isZombie || numTasks == 1) { return false } var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) for ((tid, info) <- taskInfos) { val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { logInfo( "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" .format(index, taskSet.id, info.host, threshold)) speculatableTasks += index foundTasks = true } } } foundTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { val defaultWait = conf.get("spark.locality.wait", "3s") val localityWaitKey = level match { case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process" case TaskLocality.NODE_LOCAL => "spark.locality.wait.node" case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack" case _ => null } if (localityWaitKey != null) { conf.getTimeAsMs(localityWaitKey, defaultWait) } else { 0L } } /** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. * */ private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } levels += ANY logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) levels.toArray } def recomputeLocality() { val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) } def executorAdded() { recomputeLocality() }
#
``` ##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
##
“`
相关文章推荐
- spark 2.1 TaskSchedulerImpl is Simlar between local mode and yarn mode
- spark 2.1 MemoryPool, StoragememoryPool
- spark 2.1 Stage and ResultStage and ShuffleMapStage
- spark 2.1 metrics Source and BlockManagerSource
- spark 2.1 Encoders and UploadBlock
- spark 2.1 ExecutorData and ExecutorInfo
- spark 2.1 RDDCheckpointData and ReliableRDDCheckpointData
- spark 2.1 TaskSetManager
- ubuntu安装spark2.1 hadoop2.7.3集群
- 深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析
- 深入理解Spark 2.1 Core (十四):securityManager 类源码分析
- How-to: make spark streaming collect data from Kafka topics and store data into hdfs
- cdh 安装 spark 2.1
- cdh 安装 spark 2.1
- Models and Algorithms behind Spark SQL
- Thread variables and the .NET thread pool
- Dead simple example of using Multiprocessing Queue, Pool and Locking
- Buffer Overrun, Memory Corruptions, and Special Pool
- spark 2.1 StorageLevel
- spark 2.1 BlockManagerId