第36课: TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法
2017-06-01 07:25
666 查看
第36课: TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
TaskScheduler是Spark的底层调度器,底层调度器负责Task本身的调度运行的。
我们编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行的日志中观察Spark框架的运行日志。
1. object SparkTest {
2. def main(args: Array[String]): Unit = {
3. Logger.getLogger("org").setLevel(Level.ALL)
4. val conf = new SparkConf() //创建SparkConf对象
5. conf.setAppName("Wow,My First SparkApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
6. conf.setMaster("local-cluster[1, 1,1024]")
7. conf.setSparkHome(System.getenv("SPARK_HOME"))
8. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
9. sc.parallelize(Array("100","200"),4).count()
10. sc.stop()
11. }
12. }
在IDEA中运行代码,运行结果中打印的日志如下:
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
2. 17/05/31 05:32:07 INFOSparkContext: Running Spark version 2.1.0
3. ......
4. 17/05/31 05:46:06 INFO WorkerWebUI: BoundWorkerWebUI to 0.0.0.0, and started at http://192.168.93.1:51034 5. 17/05/31 05:46:06 INFO Worker:Connecting to master 192.168.93.1:51011...
6. 17/05/31 05:46:06 INFOStandaloneAppClient$ClientEndpoint: Connecting to masterspark://192.168.93.1:51011...
7. 17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011 after38 ms (0 ms spent in bootstraps)
8. 17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011after 100 ms (0 ms spent in bootstraps)
9. 17/05/31 05:46:07 INFO Master:Registering worker 192.168.93.1:51033 with 1 cores, 1024.0 MB RAM
10. 17/05/31 05:46:07 INFO Worker:Successfully registered with master spark://192.168.93.1:51011
11. 17/05/31 05:46:07 INFO Master:Registering app Wow,My First Spark App!
12. 17/05/31 05:46:07 INFO Master:Registered app Wow,My First Spark App! with ID app-20170531054607-0000
13. 17/05/31 05:46:07 INFOStandaloneSchedulerBackend: Connected to Spark cluster with app IDapp-20170531054607-0000
14. 17/05/31 05:46:07 INFO Master:Launching executor app-20170531054607-0000/0 on worker worker-20170531054606-192.168.93.1-51033
15. 17/05/31 05:46:07 INFO Worker:Asked to launch executor app-20170531054607-0000/0 for Wow,My First Spark App!
16. 17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor added: app-20170531054607-0000/0on worker-20170531054606-192.168.93.1-51033 (192.168.93.1:51033) with 1 cores
17. ……
18. 17/05/31 05:46:07 INFOStandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0
19. 17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor updated: app-20170531054607-0000/0is now RUNNING
从日志中显示:StandaloneAppClient$ClientEndpoint:Connecting to master spark://192.168.93.1:50686表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneAppClient$ClientEndpoint:Executor added获取了Executor。具体是通过StandaloneAppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0说明 StandaloneSchedulerBackend已经准备好。
我们这里是在IDEA本地伪分布式运行的(通过count的action算子启动了job)。如果是通过Spark-shell运行程序来观察日志,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源
IDEA本地伪分布式运行,job启动的日志如下:
1. 17/05/31 05:46:08 INFO DAGScheduler: Got job 0(count at SparkTest.scala:17) with 4 output partitions
2. 17/05/31 05:46:08 INFODAGScheduler: Final stage: ResultStage 0 (count at SparkTest.scala:17)
3. 17/05/31 05:46:08 INFODAGScheduler: Parents of final stage: List()
4. 17/05/31 05:46:08 INFODAGScheduler: Missing parents: List()
5. 17/05/31 05:46:08 INFODAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelizeat SparkTest.scala:17), which has no missing parents
6. ......
7. 17/05/31 05:46:08 INFO DAGScheduler:Submitting 4 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] atparallelize at SparkTest.scala:17)
count是action算子触发了job;然后DAGScheduler获取Final stage:ResultStage,然后提交SubmittingResultStage。然后提交任务给TaskSetManager,启动任务。任务完成以后,DAGScheduler完成job。
DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)
TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级:
1. private[spark] class TaskSet(
2. val tasks: Array[Task[_]],
3. val stageId: Int,
4. val stageAttemptId: Int,
5. val priority: Int,
6. val properties: Properties) {
7. val id: String = stageId + "." +stageAttemptId
8.
9. override def toString: String = "TaskSet" + id
10. }
TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。
1. private[spark] class TaskSetManager(
2. sched: TaskSchedulerImpl,
3. val taskSet: TaskSet,
4. val maxTaskFailures: Int,
5. clock: Clock = new SystemClock()) extendsSchedulable with Logging {
TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下:
a) TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;
DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
handleJobSubmitted方法中提交submitStage:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2.
3. ......
4. submitStage(finalStage)
5. }
submitStage方法调用submitMissingTasks提交task:
1. private def submitStage(stage: Stage) {
2. ......
3. submitMissingTasks(stage, jobId.get)
4. ......
我们看一下DAGScheduler.scala的submitMissingTasks,里面调用了askScheduler.submitTasks:
1. private defsubmitMissingTasks(stage: Stage, jobId: Int) {
2. ......
3. if (tasks.size > 0) {
4. logInfo("Submitting " +tasks.size + " missing tasks from " + stage + " (" +stage.rdd + ")")
5. stage.pendingPartitions ++=tasks.map(_.partitionId)
6. logDebug("New pending partitions:" + stage.pendingPartitions)
7. taskScheduler.submitTasks(new TaskSet(
8. tasks.toArray, stage.id,stage.latestInfo.attemptId, jobId, properties))
9. stage.latestInfo.submissionTime =Some(clock.getTimeMillis())
10. ......
taskScheduler是一个接口trait,这里没有具体的实现。
1. // Submit a sequence of tasks to run.
2. def submitTasks(taskSet: TaskSet): Unit
taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks具体实现:
1. override def submitTasks(taskSet: TaskSet) {
2. val tasks = taskSet.tasks
3. logInfo("Adding task set " +taskSet.id + " with " + tasks.length + " tasks")
4. this.synchronized {
5. val manager =createTaskSetManager(taskSet, maxTaskFailures)
6. val stage = taskSet.stageId
7. val stageTaskSets =
8. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage,new HashMap[Int, TaskSetManager])
9. stageTaskSets(taskSet.stageAttemptId) =manager
10. val conflictingTaskSet =stageTaskSets.exists { case (_, ts) =>
11. ts.taskSet != taskSet &&!ts.isZombie
12. }
13. if (conflictingTaskSet) {
14. throw newIllegalStateException(s"more than one active taskSet for stage$stage:" +
15. s"${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
16. }
17. schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)
18.
19. if (!isLocal && !hasReceivedTask){
20. starvationTimer.scheduleAtFixedRate(newTimerTask() {
21. override def run() {
22. if (!hasLaunchedTask) {
23. logWarning("Initial job hasnot accepted any resources; " +
24. "check your cluster UI toensure that workers are registered " +
25. "and have sufficientresources")
26. } else {
27. this.cancel()
28. }
29. }
30. }, STARVATION_TIMEOUT_MS,STARVATION_TIMEOUT_MS)
31. }
32. hasReceivedTask = true
33. }
34. backend.reviveOffers()
35. }
高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager,createTaskSetManager代码如下:
1. private[scheduler] def createTaskSetManager(
2. taskSet: TaskSet,
3. maxTaskFailures: Int): TaskSetManager = {
4. new TaskSetManager(this, taskSet,maxTaskFailures)
5. }
TaskSchedulerImpl.scala的createTaskSetManager 会new出来一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl,任务集taskSet, 最大失败重试次数maxTaskFailures。
maxTaskFailures是在构建TaskSchedulerImpl的传入的。
1. private[spark] class TaskSchedulerImpl(
2. val sc: SparkContext,
3. val maxTaskFailures: Int,
4. isLocal: Boolean = false)
5. extends TaskScheduler with Logging
而TaskSchedulerImpl是在SparkContext中创建的,看一下SparkContext的源码:
1. val (sched, ts) =SparkContext.createTaskScheduler(this, master, deployMode)
2. _schedulerBackend = sched
3. _taskScheduler = ts
4. _dagScheduler = new DAGScheduler(this)
5. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
6. ……
7. private defcreateTaskScheduler(
8. ......
9. case SPARK_REGEX(sparkUrl) =>
10. val scheduler = newTaskSchedulerImpl(sc)
11. val masterUrls =sparkUrl.split(",").map("spark://" + _)
12. val backend = newStandaloneSchedulerBackend(scheduler, sc, masterUrls)
13. scheduler.initialize(backend)
14. (backend, scheduler)
在SparkContext.scala中,通过createTaskScheduler 创建taskScheduler,而在createTaskScheduler方法中模式匹配到Staandalone的模式,new一个TaskSchedulerImpl。
TaskSchedulerImpl的构造方法如下,spark 2.1版本默认情况下,将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。
1. private[spark]class TaskSchedulerImpl(
2. val sc: SparkContext,
3. val maxTaskFailures: Int,
4. isLocal: Boolean = false)
5. extends TaskScheduler with Logging
6. {
7. def this(sc: SparkContext) = this(sc,sc.conf.get(config.MAX_TASK_FAILURES))
进入config.scala,查看MAX_TASK_FAILURES的spark.task.maxFailures默认次数是4次。
1. private[spark] val MAX_TASK_FAILURES =
2. ConfigBuilder("spark.task.maxFailures")
3. .intConf
4. .createWithDefault(4)
回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager之后,关键的一行代码schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)。
b)SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。
schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools;addTaskSetManager:建立叶子节点TaskSetManagers。
1. private[spark] trait SchedulableBuilder {
2. def rootPool: Pool
3. def buildPools(): Unit
4. def addTaskSetManager(manager: Schedulable,properties: Properties): Unit
5. }
schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。
FIFOSchedulableBuilder是先进先出调度模式,FairSchedulableBuilder是公平调度模式,调度策略可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式。
回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder:
1. var schedulableBuilder: SchedulableBuilder =null
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
1. def initialize(backend: SchedulerBackend) {
2. this.backend = backend
3. // temporarily set rootPool name to empty
4. rootPool = new Pool("",schedulingMode, 0, 0)
5. schedulableBuilder = {
6. schedulingMode match {
7. case SchedulingMode.FIFO =>
8. new FIFOSchedulableBuilder(rootPool)
9. case SchedulingMode.FAIR =>
10. new FairSchedulableBuilder(rootPool,conf)
11. case _ =>
12. throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
13. }
14. }
15. schedulableBuilder.buildPools()
16. }
具体调度模式有2种:FIFO、FAIR,对应的SchedulableBuilder也有2种:FIFOSchedulableBuilder、FairSchedulableBuilder。
initialize 方法中的schedulingMode模式默认是FIFO:
1. privateval schedulingModeConf = conf.get("spark.scheduler.mode","FIFO")
2. val schedulingMode: SchedulingMode = try {
3. SchedulingMode.withName(schedulingModeConf.toUpperCase)
4. } catch {
5. case e: java.util.NoSuchElementException=>
6. throw newSparkException(s"Unrecognized spark.scheduler.mode:$schedulingModeConf")
7. }
回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码backend.reviveOffers()
1.
c) CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。 SchedulerBackend.scala的reviveOffers()方法没有具体实现:
1. private[spark] trait SchedulerBackend {
2. private val appId ="spark-application-" + System.currentTimeMillis
3. def start(): Unit
4. def stop(): Unit
5. def reviveOffers(): Unit
6. def defaultParallelism(): Int
CoarseGrainedSchedulerBackend是SchedulerBackend的子类,看一下CoarseGrainedSchedulerBackend的reviveOffers方法:
1. override def reviveOffers() {
2. driverEndpoint.send(ReviveOffers)
3. }
CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;
1. caseobject ReviveOffers extends CoarseGrainedClusterMessage
TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;
driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers()方法。
1. override def receive: PartialFunction[Any,Unit] = {
2. case StatusUpdate(executorId, taskId,state, data) =>
3. ......
4. case ReviveOffers =>
5. makeOffers()
d) 在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)
1. private def makeOffers() {
2. // Filter out executors under killing
3. val activeExecutors =executorDataMap.filterKeys(executorIsAlive)
4. val workOffers = activeExecutors.map {case (id, executorData) =>
5. new WorkerOffer(id,executorData.executorHost, executorData.freeCores)
6. }.toIndexedSeq
7. launchTasks(scheduler.resourceOffers(workOffers))
8. }
其中的executorData类如下,包括freeCores、totalCores等信息:
1. private[cluster] class ExecutorData(
2. val executorEndpoint: RpcEndpointRef,
3. val executorAddress: RpcAddress,
4. override val executorHost: String,
5. var freeCores: Int,
6. override val totalCores: Int,
7. override val logUrlMap: Map[String, String]
8. ) extendsExecutorInfo(executorHost, totalCores, logUrlMap)
makeOffers中首先找到可以利用的activeExecutors ,然后创建workOffers,workOffers 是一个数据结构caseclass,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存是因为之前内存已经分配完成。
1. private[spark]
2. case classWorkerOffer(executorId: String, host: String, cores: Int)
makeOffers方法中,TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入offers:IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]] 每个任务的数据本地性及放在哪个Executor上执行。
TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffer方法决定。
TaskDescription的源码如下:
1. private[spark] class TaskDescription(
2. val taskId: Long,
3. val attemptNumber: Int,
4. val executorId: String,
5. val name: String,
6. val index: Int, // Index within this task's TaskSet
7. _serializedTask: ByteBuffer)
8. extends Serializable {
resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务使得集群的任务运行均衡。resourceOffers的源码如下:
1. def resourceOffers(offers:IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
2. // Mark each slave as alive and rememberits hostname
3. // Also track if new executor is added
4. var newExecAvail = false
5. for (o <- offers) {
6. if (!hostToExecutors.contains(o.host)) {
7. hostToExecutors(o.host) = newHashSet[String]()
8. }
9. if(!executorIdToRunningTaskIds.contains(o.executorId)) {
10. hostToExecutors(o.host) += o.executorId
11. executorAdded(o.executorId, o.host)
12. executorIdToHost(o.executorId) = o.host
13. executorIdToRunningTaskIds(o.executorId)= HashSet[Long]()
14. newExecAvail = true
15. }
16. for (rack <- getRackForHost(o.host)) {
17. hostsByRack.getOrElseUpdate(rack, newHashSet[String]()) += o.host
18. }
19. }
20.
21. // Randomly shuffle offers to avoid alwaysplacing tasks on the same set of workers.
22. val shuffledOffers = Random.shuffle(offers)
23. // Build a list of tasks to assign to eachworker.
24. val tasks = shuffledOffers.map(o => newArrayBuffer[TaskDescription](o.cores))
25. val availableCpus = shuffledOffers.map(o=> o.cores).toArray
26. val sortedTaskSets =rootPool.getSortedTaskSetQueue
27. for (taskSet <- sortedTaskSets) {
28. logDebug("parentName: %s, name: %s,runningTasks: %s".format(
29. taskSet.parent.name, taskSet.name,taskSet.runningTasks))
30. if (newExecAvail) {
31. taskSet.executorAdded()
32. }
33. }
34.
35. // Take each TaskSet in our schedulingorder, and then offer it each node in increasing order
36. // of locality levels so that it gets achance to launch local tasks on all of them.
37. // NOTE: the preferredLocality order:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
38. for (taskSet <- sortedTaskSets) {
39. var launchedAnyTask = false
40. varlaunchedTaskAtCurrentMaxLocality = false
41. for (currentMaxLocality <-taskSet.myLocalityLevels) {
42. do {
43. launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
44. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
45. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
46. } while(launchedTaskAtCurrentMaxLocality)
47. }
48. if (!launchedAnyTask) {
49. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
50. }
51. }
52.
53. if (tasks.size > 0) {
54. hasLaunchedTask = true
55. }
56. return tasks
57. }
resourceOffers中:
l 标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的executor。感知集群动态资源的状况。
l offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,那就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。
l getRackForHost是数据本地性,默认情况下在一个机架Rack里面,生产环境中可能分若干个机架Rack。
l 重要的一行代码 val shuffledOffers = Random.shuffle(offers):是将可用的计算资源打散。
l tasks将获得洗牌后的shuffledOffers通过map转换,对每一个woker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的Cpu的个数决定。
ArrayBuffer[TaskDescription](o.cores)说明当前ExecutorBackend上面可以分配多少个Task,并行运行多少Task。这里和RDD的分区个数是2个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。
l TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode 调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。
TaskSchedulerImpl中的initialize中创建rootPool源码:
1. ……
2. var rootPool: Pool = null
3. ……
4.
5. def initialize(backend:SchedulerBackend) {
6. this.backend = backend
7. // temporarily set rootPool name to empty
8. rootPool = new Pool("",schedulingMode, 0, 0)
9. schedulableBuilder = {
10. schedulingMode match {
11. case SchedulingMode.FIFO =>
12. new FIFOSchedulableBuilder(rootPool)
13. case SchedulingMode.FAIR =>
14. new FairSchedulableBuilder(rootPool,conf)
15. case _ =>
16. throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
17. }
18. }
l for循环遍历sortedTaskSets,如果有新的可用的executor,通过taskSet.executorAdded()加入taskSet。
TastSetManager的executorAdded方法:
1. defrecomputeLocality() {
2. val previousLocalityLevel =myLocalityLevels(currentLocalityIndex)
3. myLocalityLevels =computeValidLocalityLevels()
4. localityWaits =myLocalityLevels.map(getLocalityWait)
5. currentLocalityIndex =getLocalityIndex(previousLocalityLevel)
6. }
7.
8. def executorAdded() {
9. recomputeLocality()
10. }
数据本地优先级从高到底依次为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY。其中NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。
l resourceOffers中追求最高级别的优先级本地性源码如下:
1. for (taskSet <- sortedTaskSets) {
2. var launchedAnyTask = false
3. var launchedTaskAtCurrentMaxLocality =false
4. for (currentMaxLocality <-taskSet.myLocalityLevels) {
5. do {
6. launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
7. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9. } while (launchedTaskAtCurrentMaxLocality)
10. }
11. if (!launchedAnyTask) {
12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13. }
14. }
循环遍历sortedTaskSets,对其中的每一个taskSet,所有的任务首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY循环一遍。myLocalityLevels通过computeValidLocalityLevels方法获取得到。
computeValidLocalityLevels的源码如下:
1. var myLocalityLevels: Array[TaskLocality]= computeValidLocalityLevels()
2. ......
3. private defcomputeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
4. import TaskLocality.{PROCESS_LOCAL,NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
5. val levels = newArrayBuffer[TaskLocality.TaskLocality]
6. if (!pendingTasksForExecutor.isEmpty&& getLocalityWait(PROCESS_LOCAL) != 0 &&
7. pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))){
8. levels += PROCESS_LOCAL
9. }
10. if (!pendingTasksForHost.isEmpty &&getLocalityWait(NODE_LOCAL) != 0 &&
11. pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))){
12. levels += NODE_LOCAL
13. }
14. if (!pendingTasksWithNoPrefs.isEmpty) {
15. levels += NO_PREF
16. }
17. if (!pendingTasksForRack.isEmpty &&getLocalityWait(RACK_LOCAL) != 0 &&
18. pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))){
19. levels += RACK_LOCAL
20. }
21. levels += ANY
22. logDebug("Valid locality levels for" + taskSet + ": " + levels.mkString(", "))
23. levels.toArray
24. }
resourceOfferSingleTaskSet的源码如下:
1. private def resourceOfferSingleTaskSet(
2. taskSet: TaskSetManager,
3. maxLocality: TaskLocality,
4. shuffledOffers: Seq[WorkerOffer],
5. availableCpus: Array[Int],
6. tasks:IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
7. var launchedTask = false
8. for (i <- 0 until shuffledOffers.size) {
9. val execId = shuffledOffers(i).executorId
10. val host = shuffledOffers(i).host
11. if (availableCpus(i) >= CPUS_PER_TASK){
12. try {
13. for (task <-taskSet.resourceOffer(execId, host, maxLocality)) {
14. tasks(i) += task
15. val tid = task.taskId
16. taskIdToTaskSetManager(tid) =taskSet
17. taskIdToExecutorId(tid) = execId
18. executorIdToRunningTaskIds(execId).add(tid)
19. availableCpus(i) -= CPUS_PER_TASK
20. assert(availableCpus(i) >= 0)
21. launchedTask = true
22. }
23. } catch {
24. case e: TaskNotSerializableException =>
25. logError(s"Resource offerfailed, task set ${taskSet.name} was not serializable")
26. // Do not offer resources for thistask, but don't throw an error to allow other
27. // task sets to be submitted.
28. return launchedTask
29. }
30. }
31. }
32. return launchedTask
33. }
resourceOfferSingleTaskSet方法中CPUS_PER_TASK 是每个Task默认是采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下:
1. // CPUs to request per task
2. val CPUS_PER_TASK =conf.getInt("spark.task.cpus", 1)
resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level
1. defresourceOffer(
2. execId: String,
3. host: String,
4. maxLocality: TaskLocality.TaskLocality)
5. : Option[TaskDescription] =
6. {
7. ……
8. sched.dagScheduler.taskStarted(task,info)
9. new TaskDescription(taskId = taskId,attemptNumber = attemptNum, execId,
10. taskName, index, serializedTask)
11.
以上的内容都是在做一件事情:获取LocalityLevel本地性的层次。DagScheduler告诉了我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task角度考虑计算的本地性,TaskScheduler是更具体的底层调度;本地性的2个层面:1,数据的本地性 2,计算的本地性。
总结一下,scheduler.resourceOffers在其中确定了每个Task具体运行在哪个ExecutorBackend;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?
i. 通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;
ii. 根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组;
iii. 如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;
iv. 通过下述代码追求最高级别的优先级本地性
1. for (taskSet <- sortedTaskSets) {
2. var launchedAnyTask = false
3. var launchedTaskAtCurrentMaxLocality =false
4. for (currentMaxLocality <-taskSet.myLocalityLevels) {
5. do {
6. launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
7. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9. } while(launchedTaskAtCurrentMaxLocality)
10. }
11. if (!launchedAnyTask) {
12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13. }
14. }
v. 通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level;
回到CoarseGrainedSchedulerBackend.scala的launchTasks方法:
1. private def launchTasks(tasks:Seq[Seq[TaskDescription]]) {
2. for (task <- tasks.flatten) {
3. val serializedTask =ser.serialize(task)
4. if (serializedTask.limit >=maxRpcMessageSize) {
5. scheduler.taskIdToTaskSetManager.get(task.taskId).foreach{ taskSetMgr =>
6. try {
7. var msg = "Serialized task%s:%d was %d bytes, which exceeds max allowed: " +
8. "spark.rpc.message.maxSize(%d bytes). Consider increasing " +
9. "spark.rpc.message.maxSizeor using broadcast variables for large values."
10. msg = msg.format(task.taskId,task.index, serializedTask.limit, maxRpcMessageSize)
11. taskSetMgr.abort(msg)
12. } catch {
13. case e: Exception =>logError("Exception in error callback", e)
14. }
15. }
16. }
17. else {
18. val executorData =executorDataMap(task.executorId)
19. executorData.freeCores -=scheduler.CPUS_PER_TASK
20.
21. logDebug(s"Launching task${task.taskId} on executor id: ${task.executorId} hostname: " +
22. s"${executorData.executorHost}.")
23.
24. executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))
25. }
26. }
27. }
f) 通过launchTasks把任务发送给ExecutorBackend去执行。
launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。
RpcUtils.scala中maxRpcMessageSize定义,spark.rpc.message.maxSize默认设置是128M:
1. private val maxRpcMessageSize =RpcUtils.maxMessageSizeBytes(conf)
2. ……
3. def maxMessageSizeBytes(conf: SparkConf): Int= {
4. val maxSizeInMB =conf.getInt("spark.rpc.message.maxSize", 128)
5. if (maxSizeInMB >MAX_MESSAGE_SIZE_IN_MB) {
6. throw new IllegalArgumentException(
7. s"spark.rpc.message.maxSize shouldnot be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
8. }
9. maxSizeInMB * 1024 * 1024
10. }
11. }
Task进行广播时候的maxSizeInMB大小是128MB,如果任务大于等于128MB的话则Task会直接被丢弃掉;如果小于128MB的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上;CoarseGrainedSchedulerBackend.scala的launchTasks方法: 通过executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。
接下来,CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。
1. caseLaunchTask(data) =>
2. if (executor == null) {
3. exitExecutor(1, "ReceivedLaunchTask command but executor was null")
4. } else {
5. val taskDesc =ser.deserialize[TaskDescription](data.value)
6. logInfo("Got assigned task "+ taskDesc.taskId)
7. executor.launchTask(this, taskId =taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
8. taskDesc.name, taskDesc.serializedTask)
9. }
TaskScheduler是Spark的底层调度器,底层调度器负责Task本身的调度运行的。
我们编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行的日志中观察Spark框架的运行日志。
1. object SparkTest {
2. def main(args: Array[String]): Unit = {
3. Logger.getLogger("org").setLevel(Level.ALL)
4. val conf = new SparkConf() //创建SparkConf对象
5. conf.setAppName("Wow,My First SparkApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
6. conf.setMaster("local-cluster[1, 1,1024]")
7. conf.setSparkHome(System.getenv("SPARK_HOME"))
8. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
9. sc.parallelize(Array("100","200"),4).count()
10. sc.stop()
11. }
12. }
在IDEA中运行代码,运行结果中打印的日志如下:
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
2. 17/05/31 05:32:07 INFOSparkContext: Running Spark version 2.1.0
3. ......
4. 17/05/31 05:46:06 INFO WorkerWebUI: BoundWorkerWebUI to 0.0.0.0, and started at http://192.168.93.1:51034 5. 17/05/31 05:46:06 INFO Worker:Connecting to master 192.168.93.1:51011...
6. 17/05/31 05:46:06 INFOStandaloneAppClient$ClientEndpoint: Connecting to masterspark://192.168.93.1:51011...
7. 17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011 after38 ms (0 ms spent in bootstraps)
8. 17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011after 100 ms (0 ms spent in bootstraps)
9. 17/05/31 05:46:07 INFO Master:Registering worker 192.168.93.1:51033 with 1 cores, 1024.0 MB RAM
10. 17/05/31 05:46:07 INFO Worker:Successfully registered with master spark://192.168.93.1:51011
11. 17/05/31 05:46:07 INFO Master:Registering app Wow,My First Spark App!
12. 17/05/31 05:46:07 INFO Master:Registered app Wow,My First Spark App! with ID app-20170531054607-0000
13. 17/05/31 05:46:07 INFOStandaloneSchedulerBackend: Connected to Spark cluster with app IDapp-20170531054607-0000
14. 17/05/31 05:46:07 INFO Master:Launching executor app-20170531054607-0000/0 on worker worker-20170531054606-192.168.93.1-51033
15. 17/05/31 05:46:07 INFO Worker:Asked to launch executor app-20170531054607-0000/0 for Wow,My First Spark App!
16. 17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor added: app-20170531054607-0000/0on worker-20170531054606-192.168.93.1-51033 (192.168.93.1:51033) with 1 cores
17. ……
18. 17/05/31 05:46:07 INFOStandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0
19. 17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor updated: app-20170531054607-0000/0is now RUNNING
从日志中显示:StandaloneAppClient$ClientEndpoint:Connecting to master spark://192.168.93.1:50686表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneAppClient$ClientEndpoint:Executor added获取了Executor。具体是通过StandaloneAppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0说明 StandaloneSchedulerBackend已经准备好。
我们这里是在IDEA本地伪分布式运行的(通过count的action算子启动了job)。如果是通过Spark-shell运行程序来观察日志,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源
IDEA本地伪分布式运行,job启动的日志如下:
1. 17/05/31 05:46:08 INFO DAGScheduler: Got job 0(count at SparkTest.scala:17) with 4 output partitions
2. 17/05/31 05:46:08 INFODAGScheduler: Final stage: ResultStage 0 (count at SparkTest.scala:17)
3. 17/05/31 05:46:08 INFODAGScheduler: Parents of final stage: List()
4. 17/05/31 05:46:08 INFODAGScheduler: Missing parents: List()
5. 17/05/31 05:46:08 INFODAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelizeat SparkTest.scala:17), which has no missing parents
6. ......
7. 17/05/31 05:46:08 INFO DAGScheduler:Submitting 4 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] atparallelize at SparkTest.scala:17)
count是action算子触发了job;然后DAGScheduler获取Final stage:ResultStage,然后提交SubmittingResultStage。然后提交任务给TaskSetManager,启动任务。任务完成以后,DAGScheduler完成job。
DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)
TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级:
1. private[spark] class TaskSet(
2. val tasks: Array[Task[_]],
3. val stageId: Int,
4. val stageAttemptId: Int,
5. val priority: Int,
6. val properties: Properties) {
7. val id: String = stageId + "." +stageAttemptId
8.
9. override def toString: String = "TaskSet" + id
10. }
TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。
1. private[spark] class TaskSetManager(
2. sched: TaskSchedulerImpl,
3. val taskSet: TaskSet,
4. val maxTaskFailures: Int,
5. clock: Clock = new SystemClock()) extendsSchedulable with Logging {
TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下:
a) TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;
DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法:
1. private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2. case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3. dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
handleJobSubmitted方法中提交submitStage:
1. private[scheduler] defhandleJobSubmitted(jobId: Int,
2.
3. ......
4. submitStage(finalStage)
5. }
submitStage方法调用submitMissingTasks提交task:
1. private def submitStage(stage: Stage) {
2. ......
3. submitMissingTasks(stage, jobId.get)
4. ......
我们看一下DAGScheduler.scala的submitMissingTasks,里面调用了askScheduler.submitTasks:
1. private defsubmitMissingTasks(stage: Stage, jobId: Int) {
2. ......
3. if (tasks.size > 0) {
4. logInfo("Submitting " +tasks.size + " missing tasks from " + stage + " (" +stage.rdd + ")")
5. stage.pendingPartitions ++=tasks.map(_.partitionId)
6. logDebug("New pending partitions:" + stage.pendingPartitions)
7. taskScheduler.submitTasks(new TaskSet(
8. tasks.toArray, stage.id,stage.latestInfo.attemptId, jobId, properties))
9. stage.latestInfo.submissionTime =Some(clock.getTimeMillis())
10. ......
taskScheduler是一个接口trait,这里没有具体的实现。
1. // Submit a sequence of tasks to run.
2. def submitTasks(taskSet: TaskSet): Unit
taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks具体实现:
1. override def submitTasks(taskSet: TaskSet) {
2. val tasks = taskSet.tasks
3. logInfo("Adding task set " +taskSet.id + " with " + tasks.length + " tasks")
4. this.synchronized {
5. val manager =createTaskSetManager(taskSet, maxTaskFailures)
6. val stage = taskSet.stageId
7. val stageTaskSets =
8. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage,new HashMap[Int, TaskSetManager])
9. stageTaskSets(taskSet.stageAttemptId) =manager
10. val conflictingTaskSet =stageTaskSets.exists { case (_, ts) =>
11. ts.taskSet != taskSet &&!ts.isZombie
12. }
13. if (conflictingTaskSet) {
14. throw newIllegalStateException(s"more than one active taskSet for stage$stage:" +
15. s"${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
16. }
17. schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)
18.
19. if (!isLocal && !hasReceivedTask){
20. starvationTimer.scheduleAtFixedRate(newTimerTask() {
21. override def run() {
22. if (!hasLaunchedTask) {
23. logWarning("Initial job hasnot accepted any resources; " +
24. "check your cluster UI toensure that workers are registered " +
25. "and have sufficientresources")
26. } else {
27. this.cancel()
28. }
29. }
30. }, STARVATION_TIMEOUT_MS,STARVATION_TIMEOUT_MS)
31. }
32. hasReceivedTask = true
33. }
34. backend.reviveOffers()
35. }
高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager,createTaskSetManager代码如下:
1. private[scheduler] def createTaskSetManager(
2. taskSet: TaskSet,
3. maxTaskFailures: Int): TaskSetManager = {
4. new TaskSetManager(this, taskSet,maxTaskFailures)
5. }
TaskSchedulerImpl.scala的createTaskSetManager 会new出来一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl,任务集taskSet, 最大失败重试次数maxTaskFailures。
maxTaskFailures是在构建TaskSchedulerImpl的传入的。
1. private[spark] class TaskSchedulerImpl(
2. val sc: SparkContext,
3. val maxTaskFailures: Int,
4. isLocal: Boolean = false)
5. extends TaskScheduler with Logging
而TaskSchedulerImpl是在SparkContext中创建的,看一下SparkContext的源码:
1. val (sched, ts) =SparkContext.createTaskScheduler(this, master, deployMode)
2. _schedulerBackend = sched
3. _taskScheduler = ts
4. _dagScheduler = new DAGScheduler(this)
5. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
6. ……
7. private defcreateTaskScheduler(
8. ......
9. case SPARK_REGEX(sparkUrl) =>
10. val scheduler = newTaskSchedulerImpl(sc)
11. val masterUrls =sparkUrl.split(",").map("spark://" + _)
12. val backend = newStandaloneSchedulerBackend(scheduler, sc, masterUrls)
13. scheduler.initialize(backend)
14. (backend, scheduler)
在SparkContext.scala中,通过createTaskScheduler 创建taskScheduler,而在createTaskScheduler方法中模式匹配到Staandalone的模式,new一个TaskSchedulerImpl。
TaskSchedulerImpl的构造方法如下,spark 2.1版本默认情况下,将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。
1. private[spark]class TaskSchedulerImpl(
2. val sc: SparkContext,
3. val maxTaskFailures: Int,
4. isLocal: Boolean = false)
5. extends TaskScheduler with Logging
6. {
7. def this(sc: SparkContext) = this(sc,sc.conf.get(config.MAX_TASK_FAILURES))
进入config.scala,查看MAX_TASK_FAILURES的spark.task.maxFailures默认次数是4次。
1. private[spark] val MAX_TASK_FAILURES =
2. ConfigBuilder("spark.task.maxFailures")
3. .intConf
4. .createWithDefault(4)
回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager之后,关键的一行代码schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)。
b)SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。
schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools;addTaskSetManager:建立叶子节点TaskSetManagers。
1. private[spark] trait SchedulableBuilder {
2. def rootPool: Pool
3. def buildPools(): Unit
4. def addTaskSetManager(manager: Schedulable,properties: Properties): Unit
5. }
schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。
FIFOSchedulableBuilder是先进先出调度模式,FairSchedulableBuilder是公平调度模式,调度策略可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式。
回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder:
1. var schedulableBuilder: SchedulableBuilder =null
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
1. def initialize(backend: SchedulerBackend) {
2. this.backend = backend
3. // temporarily set rootPool name to empty
4. rootPool = new Pool("",schedulingMode, 0, 0)
5. schedulableBuilder = {
6. schedulingMode match {
7. case SchedulingMode.FIFO =>
8. new FIFOSchedulableBuilder(rootPool)
9. case SchedulingMode.FAIR =>
10. new FairSchedulableBuilder(rootPool,conf)
11. case _ =>
12. throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
13. }
14. }
15. schedulableBuilder.buildPools()
16. }
具体调度模式有2种:FIFO、FAIR,对应的SchedulableBuilder也有2种:FIFOSchedulableBuilder、FairSchedulableBuilder。
initialize 方法中的schedulingMode模式默认是FIFO:
1. privateval schedulingModeConf = conf.get("spark.scheduler.mode","FIFO")
2. val schedulingMode: SchedulingMode = try {
3. SchedulingMode.withName(schedulingModeConf.toUpperCase)
4. } catch {
5. case e: java.util.NoSuchElementException=>
6. throw newSparkException(s"Unrecognized spark.scheduler.mode:$schedulingModeConf")
7. }
回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码backend.reviveOffers()
1.
c) CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。 SchedulerBackend.scala的reviveOffers()方法没有具体实现:
1. private[spark] trait SchedulerBackend {
2. private val appId ="spark-application-" + System.currentTimeMillis
3. def start(): Unit
4. def stop(): Unit
5. def reviveOffers(): Unit
6. def defaultParallelism(): Int
CoarseGrainedSchedulerBackend是SchedulerBackend的子类,看一下CoarseGrainedSchedulerBackend的reviveOffers方法:
1. override def reviveOffers() {
2. driverEndpoint.send(ReviveOffers)
3. }
CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;
1. caseobject ReviveOffers extends CoarseGrainedClusterMessage
TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;
driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers()方法。
1. override def receive: PartialFunction[Any,Unit] = {
2. case StatusUpdate(executorId, taskId,state, data) =>
3. ......
4. case ReviveOffers =>
5. makeOffers()
d) 在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)
1. private def makeOffers() {
2. // Filter out executors under killing
3. val activeExecutors =executorDataMap.filterKeys(executorIsAlive)
4. val workOffers = activeExecutors.map {case (id, executorData) =>
5. new WorkerOffer(id,executorData.executorHost, executorData.freeCores)
6. }.toIndexedSeq
7. launchTasks(scheduler.resourceOffers(workOffers))
8. }
其中的executorData类如下,包括freeCores、totalCores等信息:
1. private[cluster] class ExecutorData(
2. val executorEndpoint: RpcEndpointRef,
3. val executorAddress: RpcAddress,
4. override val executorHost: String,
5. var freeCores: Int,
6. override val totalCores: Int,
7. override val logUrlMap: Map[String, String]
8. ) extendsExecutorInfo(executorHost, totalCores, logUrlMap)
makeOffers中首先找到可以利用的activeExecutors ,然后创建workOffers,workOffers 是一个数据结构caseclass,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存是因为之前内存已经分配完成。
1. private[spark]
2. case classWorkerOffer(executorId: String, host: String, cores: Int)
makeOffers方法中,TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入offers:IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]] 每个任务的数据本地性及放在哪个Executor上执行。
TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffer方法决定。
TaskDescription的源码如下:
1. private[spark] class TaskDescription(
2. val taskId: Long,
3. val attemptNumber: Int,
4. val executorId: String,
5. val name: String,
6. val index: Int, // Index within this task's TaskSet
7. _serializedTask: ByteBuffer)
8. extends Serializable {
resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务使得集群的任务运行均衡。resourceOffers的源码如下:
1. def resourceOffers(offers:IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
2. // Mark each slave as alive and rememberits hostname
3. // Also track if new executor is added
4. var newExecAvail = false
5. for (o <- offers) {
6. if (!hostToExecutors.contains(o.host)) {
7. hostToExecutors(o.host) = newHashSet[String]()
8. }
9. if(!executorIdToRunningTaskIds.contains(o.executorId)) {
10. hostToExecutors(o.host) += o.executorId
11. executorAdded(o.executorId, o.host)
12. executorIdToHost(o.executorId) = o.host
13. executorIdToRunningTaskIds(o.executorId)= HashSet[Long]()
14. newExecAvail = true
15. }
16. for (rack <- getRackForHost(o.host)) {
17. hostsByRack.getOrElseUpdate(rack, newHashSet[String]()) += o.host
18. }
19. }
20.
21. // Randomly shuffle offers to avoid alwaysplacing tasks on the same set of workers.
22. val shuffledOffers = Random.shuffle(offers)
23. // Build a list of tasks to assign to eachworker.
24. val tasks = shuffledOffers.map(o => newArrayBuffer[TaskDescription](o.cores))
25. val availableCpus = shuffledOffers.map(o=> o.cores).toArray
26. val sortedTaskSets =rootPool.getSortedTaskSetQueue
27. for (taskSet <- sortedTaskSets) {
28. logDebug("parentName: %s, name: %s,runningTasks: %s".format(
29. taskSet.parent.name, taskSet.name,taskSet.runningTasks))
30. if (newExecAvail) {
31. taskSet.executorAdded()
32. }
33. }
34.
35. // Take each TaskSet in our schedulingorder, and then offer it each node in increasing order
36. // of locality levels so that it gets achance to launch local tasks on all of them.
37. // NOTE: the preferredLocality order:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
38. for (taskSet <- sortedTaskSets) {
39. var launchedAnyTask = false
40. varlaunchedTaskAtCurrentMaxLocality = false
41. for (currentMaxLocality <-taskSet.myLocalityLevels) {
42. do {
43. launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
44. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
45. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
46. } while(launchedTaskAtCurrentMaxLocality)
47. }
48. if (!launchedAnyTask) {
49. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
50. }
51. }
52.
53. if (tasks.size > 0) {
54. hasLaunchedTask = true
55. }
56. return tasks
57. }
resourceOffers中:
l 标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的executor。感知集群动态资源的状况。
l offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,那就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。
l getRackForHost是数据本地性,默认情况下在一个机架Rack里面,生产环境中可能分若干个机架Rack。
l 重要的一行代码 val shuffledOffers = Random.shuffle(offers):是将可用的计算资源打散。
l tasks将获得洗牌后的shuffledOffers通过map转换,对每一个woker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的Cpu的个数决定。
ArrayBuffer[TaskDescription](o.cores)说明当前ExecutorBackend上面可以分配多少个Task,并行运行多少Task。这里和RDD的分区个数是2个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。
l TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode 调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。
TaskSchedulerImpl中的initialize中创建rootPool源码:
1. ……
2. var rootPool: Pool = null
3. ……
4.
5. def initialize(backend:SchedulerBackend) {
6. this.backend = backend
7. // temporarily set rootPool name to empty
8. rootPool = new Pool("",schedulingMode, 0, 0)
9. schedulableBuilder = {
10. schedulingMode match {
11. case SchedulingMode.FIFO =>
12. new FIFOSchedulableBuilder(rootPool)
13. case SchedulingMode.FAIR =>
14. new FairSchedulableBuilder(rootPool,conf)
15. case _ =>
16. throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
17. }
18. }
l for循环遍历sortedTaskSets,如果有新的可用的executor,通过taskSet.executorAdded()加入taskSet。
TastSetManager的executorAdded方法:
1. defrecomputeLocality() {
2. val previousLocalityLevel =myLocalityLevels(currentLocalityIndex)
3. myLocalityLevels =computeValidLocalityLevels()
4. localityWaits =myLocalityLevels.map(getLocalityWait)
5. currentLocalityIndex =getLocalityIndex(previousLocalityLevel)
6. }
7.
8. def executorAdded() {
9. recomputeLocality()
10. }
数据本地优先级从高到底依次为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY。其中NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。
l resourceOffers中追求最高级别的优先级本地性源码如下:
1. for (taskSet <- sortedTaskSets) {
2. var launchedAnyTask = false
3. var launchedTaskAtCurrentMaxLocality =false
4. for (currentMaxLocality <-taskSet.myLocalityLevels) {
5. do {
6. launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
7. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9. } while (launchedTaskAtCurrentMaxLocality)
10. }
11. if (!launchedAnyTask) {
12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13. }
14. }
循环遍历sortedTaskSets,对其中的每一个taskSet,所有的任务首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY循环一遍。myLocalityLevels通过computeValidLocalityLevels方法获取得到。
computeValidLocalityLevels的源码如下:
1. var myLocalityLevels: Array[TaskLocality]= computeValidLocalityLevels()
2. ......
3. private defcomputeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
4. import TaskLocality.{PROCESS_LOCAL,NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
5. val levels = newArrayBuffer[TaskLocality.TaskLocality]
6. if (!pendingTasksForExecutor.isEmpty&& getLocalityWait(PROCESS_LOCAL) != 0 &&
7. pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))){
8. levels += PROCESS_LOCAL
9. }
10. if (!pendingTasksForHost.isEmpty &&getLocalityWait(NODE_LOCAL) != 0 &&
11. pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))){
12. levels += NODE_LOCAL
13. }
14. if (!pendingTasksWithNoPrefs.isEmpty) {
15. levels += NO_PREF
16. }
17. if (!pendingTasksForRack.isEmpty &&getLocalityWait(RACK_LOCAL) != 0 &&
18. pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))){
19. levels += RACK_LOCAL
20. }
21. levels += ANY
22. logDebug("Valid locality levels for" + taskSet + ": " + levels.mkString(", "))
23. levels.toArray
24. }
resourceOfferSingleTaskSet的源码如下:
1. private def resourceOfferSingleTaskSet(
2. taskSet: TaskSetManager,
3. maxLocality: TaskLocality,
4. shuffledOffers: Seq[WorkerOffer],
5. availableCpus: Array[Int],
6. tasks:IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
7. var launchedTask = false
8. for (i <- 0 until shuffledOffers.size) {
9. val execId = shuffledOffers(i).executorId
10. val host = shuffledOffers(i).host
11. if (availableCpus(i) >= CPUS_PER_TASK){
12. try {
13. for (task <-taskSet.resourceOffer(execId, host, maxLocality)) {
14. tasks(i) += task
15. val tid = task.taskId
16. taskIdToTaskSetManager(tid) =taskSet
17. taskIdToExecutorId(tid) = execId
18. executorIdToRunningTaskIds(execId).add(tid)
19. availableCpus(i) -= CPUS_PER_TASK
20. assert(availableCpus(i) >= 0)
21. launchedTask = true
22. }
23. } catch {
24. case e: TaskNotSerializableException =>
25. logError(s"Resource offerfailed, task set ${taskSet.name} was not serializable")
26. // Do not offer resources for thistask, but don't throw an error to allow other
27. // task sets to be submitted.
28. return launchedTask
29. }
30. }
31. }
32. return launchedTask
33. }
resourceOfferSingleTaskSet方法中CPUS_PER_TASK 是每个Task默认是采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下:
1. // CPUs to request per task
2. val CPUS_PER_TASK =conf.getInt("spark.task.cpus", 1)
resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level
1. defresourceOffer(
2. execId: String,
3. host: String,
4. maxLocality: TaskLocality.TaskLocality)
5. : Option[TaskDescription] =
6. {
7. ……
8. sched.dagScheduler.taskStarted(task,info)
9. new TaskDescription(taskId = taskId,attemptNumber = attemptNum, execId,
10. taskName, index, serializedTask)
11.
以上的内容都是在做一件事情:获取LocalityLevel本地性的层次。DagScheduler告诉了我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task角度考虑计算的本地性,TaskScheduler是更具体的底层调度;本地性的2个层面:1,数据的本地性 2,计算的本地性。
总结一下,scheduler.resourceOffers在其中确定了每个Task具体运行在哪个ExecutorBackend;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?
i. 通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;
ii. 根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组;
iii. 如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;
iv. 通过下述代码追求最高级别的优先级本地性
1. for (taskSet <- sortedTaskSets) {
2. var launchedAnyTask = false
3. var launchedTaskAtCurrentMaxLocality =false
4. for (currentMaxLocality <-taskSet.myLocalityLevels) {
5. do {
6. launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
7. taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8. launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9. } while(launchedTaskAtCurrentMaxLocality)
10. }
11. if (!launchedAnyTask) {
12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13. }
14. }
v. 通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level;
回到CoarseGrainedSchedulerBackend.scala的launchTasks方法:
1. private def launchTasks(tasks:Seq[Seq[TaskDescription]]) {
2. for (task <- tasks.flatten) {
3. val serializedTask =ser.serialize(task)
4. if (serializedTask.limit >=maxRpcMessageSize) {
5. scheduler.taskIdToTaskSetManager.get(task.taskId).foreach{ taskSetMgr =>
6. try {
7. var msg = "Serialized task%s:%d was %d bytes, which exceeds max allowed: " +
8. "spark.rpc.message.maxSize(%d bytes). Consider increasing " +
9. "spark.rpc.message.maxSizeor using broadcast variables for large values."
10. msg = msg.format(task.taskId,task.index, serializedTask.limit, maxRpcMessageSize)
11. taskSetMgr.abort(msg)
12. } catch {
13. case e: Exception =>logError("Exception in error callback", e)
14. }
15. }
16. }
17. else {
18. val executorData =executorDataMap(task.executorId)
19. executorData.freeCores -=scheduler.CPUS_PER_TASK
20.
21. logDebug(s"Launching task${task.taskId} on executor id: ${task.executorId} hostname: " +
22. s"${executorData.executorHost}.")
23.
24. executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))
25. }
26. }
27. }
f) 通过launchTasks把任务发送给ExecutorBackend去执行。
launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。
RpcUtils.scala中maxRpcMessageSize定义,spark.rpc.message.maxSize默认设置是128M:
1. private val maxRpcMessageSize =RpcUtils.maxMessageSizeBytes(conf)
2. ……
3. def maxMessageSizeBytes(conf: SparkConf): Int= {
4. val maxSizeInMB =conf.getInt("spark.rpc.message.maxSize", 128)
5. if (maxSizeInMB >MAX_MESSAGE_SIZE_IN_MB) {
6. throw new IllegalArgumentException(
7. s"spark.rpc.message.maxSize shouldnot be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
8. }
9. maxSizeInMB * 1024 * 1024
10. }
11. }
Task进行广播时候的maxSizeInMB大小是128MB,如果任务大于等于128MB的话则Task会直接被丢弃掉;如果小于128MB的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上;CoarseGrainedSchedulerBackend.scala的launchTasks方法: 通过executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。
接下来,CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。
1. caseLaunchTask(data) =>
2. if (executor == null) {
3. exitExecutor(1, "ReceivedLaunchTask command but executor was null")
4. } else {
5. val taskDesc =ser.deserialize[TaskDescription](data.value)
6. logInfo("Got assigned task "+ taskDesc.taskId)
7. executor.launchTask(this, taskId =taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
8. taskDesc.name, taskDesc.serializedTask)
9. }
相关文章推荐
- [Spark内核] 第36课:TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
- TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解
- 第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- spark学习-64-源代码:schedulerBackend和taskScheduler的创建(2)-StandLone
- Spark商业案例与性能调优实战100课》第36课:彻底解密Spark 2.1.X中Sort Shuffle中Reducer端源码内幕.pptx>>
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark之SchedulerBackend、DAGScheduler和TaskScheduler
- spark学习-65-源代码:schedulerBackend和taskScheduler的创建(3)-local-cluster
- spark学习-66-源代码:schedulerBackend和taskScheduler的创建(4)-yarn
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- 第28课 :在集成开发环境中详解spark streaming的运行日志内幕
- 大数据IMF传奇行动绝密课程第36课:TaskScheduler内幕天机解密
- 《Spark商业案例与性能调优实战100课》第28课:彻底解密Spark Sort-Based Shuffle排序具体实现内幕和源码详解
- Spark定制班第28课:在集成开发环境中详解Spark Streaming的运行日志内幕
- 第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等
- 第43课: Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- spark调度分析: DAGScheduler, TaskScheduler, SchedulerBackend