您的位置:首页 > 运维架构 > Shell

第36课: TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法

2017-06-01 07:25 666 查看
第36课:  TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

TaskScheduler是Spark的底层调度器,底层调度器负责Task本身的调度运行的。
         我们编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行的日志中观察Spark框架的运行日志。
1.          object SparkTest {
2.           def main(args: Array[String]): Unit = {
3.             Logger.getLogger("org").setLevel(Level.ALL)
4.             val conf = new SparkConf() //创建SparkConf对象
5.             conf.setAppName("Wow,My First SparkApp!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
6.             conf.setMaster("local-cluster[1, 1,1024]")
7.             conf.setSparkHome(System.getenv("SPARK_HOME"))
8.             val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
9.             sc.parallelize(Array("100","200"),4).count()
10.          sc.stop()
11.        }
12.      }
    
   在IDEA中运行代码,运行结果中打印的日志如下:
1.          Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
2.         17/05/31 05:32:07 INFOSparkContext: Running Spark version 2.1.0
3.          ......
4.          17/05/31 05:46:06 INFO WorkerWebUI: BoundWorkerWebUI to 0.0.0.0, and started at http://192.168.93.1:51034 5.         17/05/31 05:46:06 INFO Worker:Connecting to master 192.168.93.1:51011...
6.         17/05/31 05:46:06 INFOStandaloneAppClient$ClientEndpoint: Connecting to masterspark://192.168.93.1:51011...
7.         17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011 after38 ms (0 ms spent in bootstraps)
8.         17/05/31 05:46:06 INFOTransportClientFactory: Successfully created connection to /192.168.93.1:51011after 100 ms (0 ms spent in bootstraps)
9.         17/05/31 05:46:07 INFO Master:Registering worker 192.168.93.1:51033 with 1 cores, 1024.0 MB RAM
10.      17/05/31 05:46:07 INFO Worker:Successfully registered with master spark://192.168.93.1:51011
11.      17/05/31 05:46:07 INFO Master:Registering app Wow,My First Spark App!
12.      17/05/31 05:46:07 INFO Master:Registered app Wow,My First Spark App! with ID app-20170531054607-0000
13.      17/05/31 05:46:07 INFOStandaloneSchedulerBackend: Connected to Spark cluster with app IDapp-20170531054607-0000
14.      17/05/31 05:46:07 INFO Master:Launching executor app-20170531054607-0000/0 on worker worker-20170531054606-192.168.93.1-51033
15.      17/05/31 05:46:07 INFO Worker:Asked to launch executor app-20170531054607-0000/0 for Wow,My First Spark App!
16.      17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor added: app-20170531054607-0000/0on worker-20170531054606-192.168.93.1-51033 (192.168.93.1:51033) with 1 cores
17.      ……
18.      17/05/31 05:46:07 INFOStandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0
19.      17/05/31 05:46:07 INFOStandaloneAppClient$ClientEndpoint: Executor updated: app-20170531054607-0000/0is now RUNNING
 
从日志中显示:StandaloneAppClient$ClientEndpoint:Connecting to master spark://192.168.93.1:50686表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneAppClient$ClientEndpoint:Executor added获取了Executor。具体是通过StandaloneAppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginningafter reached minRegisteredResourcesRatio: 0.0说明 StandaloneSchedulerBackend已经准备好。
我们这里是在IDEA本地伪分布式运行的(通过count的action算子启动了job)。如果是通过Spark-shell运行程序来观察日志,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源
IDEA本地伪分布式运行,job启动的日志如下:
1.          17/05/31 05:46:08 INFO DAGScheduler: Got job 0(count at SparkTest.scala:17) with 4 output partitions
2.         17/05/31 05:46:08 INFODAGScheduler: Final stage: ResultStage 0 (count at SparkTest.scala:17)
3.         17/05/31 05:46:08 INFODAGScheduler: Parents of final stage: List()
4.         17/05/31 05:46:08 INFODAGScheduler: Missing parents: List()
5.         17/05/31 05:46:08 INFODAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelizeat SparkTest.scala:17), which has no missing parents
6.         ......
7.         17/05/31 05:46:08 INFO DAGScheduler:Submitting 4 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] atparallelize at SparkTest.scala:17)
count是action算子触发了job;然后DAGScheduler获取Final stage:ResultStage,然后提交SubmittingResultStage。然后提交任务给TaskSetManager,启动任务。任务完成以后,DAGScheduler完成job。
DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)
TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级:
1.          private[spark] class TaskSet(
2.             val tasks: Array[Task[_]],
3.             val stageId: Int,
4.             val stageAttemptId: Int,
5.             val priority: Int,
6.             val properties: Properties) {
7.           val id: String = stageId + "." +stageAttemptId
8.          
9.           override def toString: String = "TaskSet" + id
10.      }
 
TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。
1.          private[spark] class TaskSetManager(
2.             sched: TaskSchedulerImpl,
3.             val taskSet: TaskSet,
4.             val maxTaskFailures: Int,
5.             clock: Clock = new SystemClock()) extendsSchedulable with Logging {
 
TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下:
a)   TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;
DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法:
1.             private def doOnReceive(event:DAGSchedulerEvent): Unit = event match {
2.             case JobSubmitted(jobId, rdd, func,partitions, callSite, listener, properties) =>
3.               dagScheduler.handleJobSubmitted(jobId,rdd, func, partitions, callSite, listener, properties)
 
handleJobSubmitted方法中提交submitStage:
1.           private[scheduler] defhandleJobSubmitted(jobId: Int,
2.             
3.           ......
4.             submitStage(finalStage)
5.           }
 
submitStage方法调用submitMissingTasks提交task:
1.           private def submitStage(stage: Stage) {
2.           ......
3.                   submitMissingTasks(stage, jobId.get)
4.              ......
 
我们看一下DAGScheduler.scala的submitMissingTasks,里面调用了askScheduler.submitTasks:
1.         private defsubmitMissingTasks(stage: Stage, jobId: Int) {
2.            ......
3.             if (tasks.size > 0) {
4.               logInfo("Submitting " +tasks.size + " missing tasks from " + stage + " (" +stage.rdd + ")")
5.               stage.pendingPartitions ++=tasks.map(_.partitionId)
6.               logDebug("New pending partitions:" + stage.pendingPartitions)
7.               taskScheduler.submitTasks(new TaskSet(
8.                 tasks.toArray, stage.id,stage.latestInfo.attemptId, jobId, properties))
9.               stage.latestInfo.submissionTime =Some(clock.getTimeMillis())
10.         ......
taskScheduler是一个接口trait,这里没有具体的实现。
1.           // Submit a sequence of tasks to run.
2.           def submitTasks(taskSet: TaskSet): Unit
 
taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks具体实现:
1.            override def submitTasks(taskSet: TaskSet) {
2.             val tasks = taskSet.tasks
3.             logInfo("Adding task set " +taskSet.id + " with " + tasks.length + " tasks")
4.             this.synchronized {
5.               val manager =createTaskSetManager(taskSet, maxTaskFailures)
6.               val stage = taskSet.stageId
7.               val stageTaskSets =
8.                 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage,new HashMap[Int, TaskSetManager])
9.               stageTaskSets(taskSet.stageAttemptId) =manager
10.            val conflictingTaskSet =stageTaskSets.exists { case (_, ts) =>
11.              ts.taskSet != taskSet &&!ts.isZombie
12.            }
13.            if (conflictingTaskSet) {
14.              throw newIllegalStateException(s"more than one active taskSet for stage$stage:" +
15.                s"${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
16.            }
17.            schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)
18.       
19.            if (!isLocal && !hasReceivedTask){
20.              starvationTimer.scheduleAtFixedRate(newTimerTask() {
21.                override def run() {
22.                  if (!hasLaunchedTask) {
23.                    logWarning("Initial job hasnot accepted any resources; " +
24.                      "check your cluster UI toensure that workers are registered " +
25.                      "and have sufficientresources")
26.                  } else {
27.                    this.cancel()
28.                  }
29.                }
30.              }, STARVATION_TIMEOUT_MS,STARVATION_TIMEOUT_MS)
31.            }
32.            hasReceivedTask = true
33.          }
34.          backend.reviveOffers()
35.        }
 
高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager,createTaskSetManager代码如下:
1.            private[scheduler] def createTaskSetManager(
2.               taskSet: TaskSet,
3.               maxTaskFailures: Int): TaskSetManager = {
4.             new TaskSetManager(this, taskSet,maxTaskFailures)
5.           }
 
TaskSchedulerImpl.scala的createTaskSetManager 会new出来一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl,任务集taskSet, 最大失败重试次数maxTaskFailures。
maxTaskFailures是在构建TaskSchedulerImpl的传入的。
1.            private[spark] class TaskSchedulerImpl(
2.             val sc: SparkContext,
3.             val maxTaskFailures: Int,
4.             isLocal: Boolean = false)
5.           extends TaskScheduler with Logging
 
而TaskSchedulerImpl是在SparkContext中创建的,看一下SparkContext的源码:
1.                 val (sched, ts) =SparkContext.createTaskScheduler(this, master, deployMode)
2.             _schedulerBackend = sched
3.             _taskScheduler = ts
4.             _dagScheduler = new DAGScheduler(this)
5.             _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
6.         ……
7.         private defcreateTaskScheduler(
8.         ......
9.          case SPARK_REGEX(sparkUrl) =>
10.              val scheduler = newTaskSchedulerImpl(sc)
11.              val masterUrls =sparkUrl.split(",").map("spark://" + _)
12.              val backend = newStandaloneSchedulerBackend(scheduler, sc, masterUrls)
13.              scheduler.initialize(backend)
14.              (backend, scheduler)
 
在SparkContext.scala中,通过createTaskScheduler 创建taskScheduler,而在createTaskScheduler方法中模式匹配到Staandalone的模式,new一个TaskSchedulerImpl。
TaskSchedulerImpl的构造方法如下,spark 2.1版本默认情况下,将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。
1.             private[spark]class TaskSchedulerImpl(
2.             val sc: SparkContext,
3.             val maxTaskFailures: Int,
4.             isLocal: Boolean = false)
5.           extends TaskScheduler with Logging
6.         {
7.           def this(sc: SparkContext) = this(sc,sc.conf.get(config.MAX_TASK_FAILURES))
 
进入config.scala,查看MAX_TASK_FAILURES的spark.task.maxFailures默认次数是4次。
1.           private[spark] val MAX_TASK_FAILURES =
2.             ConfigBuilder("spark.task.maxFailures")
3.               .intConf
4.               .createWithDefault(4)
 
回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager之后,关键的一行代码schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)。   
b)SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。
schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools;addTaskSetManager:建立叶子节点TaskSetManagers。
1.           private[spark] trait SchedulableBuilder {
2.           def rootPool: Pool
3.           def buildPools(): Unit
4.           def addTaskSetManager(manager: Schedulable,properties: Properties): Unit
5.         }
 
schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。
FIFOSchedulableBuilder是先进先出调度模式,FairSchedulableBuilder是公平调度模式,调度策略可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式。
 
回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder:
1.          var schedulableBuilder: SchedulableBuilder =null
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
1.             def initialize(backend: SchedulerBackend) {
2.             this.backend = backend
3.             // temporarily set rootPool name to empty
4.             rootPool = new Pool("",schedulingMode, 0, 0)
5.             schedulableBuilder = {
6.               schedulingMode match {
7.                 case SchedulingMode.FIFO =>
8.                   new FIFOSchedulableBuilder(rootPool)
9.                 case SchedulingMode.FAIR =>
10.                new FairSchedulableBuilder(rootPool,conf)
11.              case _ =>
12.                throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
13.            }
14.          }
15.          schedulableBuilder.buildPools()
16.        }
具体调度模式有2种:FIFO、FAIR,对应的SchedulableBuilder也有2种:FIFOSchedulableBuilder、FairSchedulableBuilder。
initialize 方法中的schedulingMode模式默认是FIFO:
1.            privateval schedulingModeConf = conf.get("spark.scheduler.mode","FIFO")
2.           val schedulingMode: SchedulingMode = try {
3.             SchedulingMode.withName(schedulingModeConf.toUpperCase)
4.           } catch {
5.             case e: java.util.NoSuchElementException=>
6.               throw newSparkException(s"Unrecognized spark.scheduler.mode:$schedulingModeConf")
7.           }
 
回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码backend.reviveOffers()
1.          
c) CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。 SchedulerBackend.scala的reviveOffers()方法没有具体实现:
1.          private[spark] trait SchedulerBackend {
2.           private val appId ="spark-application-" + System.currentTimeMillis
3.           def start(): Unit
4.           def stop(): Unit
5.           def reviveOffers(): Unit
6.           def defaultParallelism(): Int
 
CoarseGrainedSchedulerBackend是SchedulerBackend的子类,看一下CoarseGrainedSchedulerBackend的reviveOffers方法:
1.           override def reviveOffers() {
2.             driverEndpoint.send(ReviveOffers)
3.           }
         CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;
1.              caseobject ReviveOffers extends CoarseGrainedClusterMessage
TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;
driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers()方法。
1.              override def receive: PartialFunction[Any,Unit] = {
2.               case StatusUpdate(executorId, taskId,state, data) =>
3.            ......
4.               case ReviveOffers =>
5.                 makeOffers()
d)      在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)
1.              private def makeOffers() {
2.               // Filter out executors under killing
3.               val activeExecutors =executorDataMap.filterKeys(executorIsAlive)
4.               val workOffers = activeExecutors.map {case (id, executorData) =>
5.                 new WorkerOffer(id,executorData.executorHost, executorData.freeCores)
6.               }.toIndexedSeq
7.               launchTasks(scheduler.resourceOffers(workOffers))
8.             }
 
其中的executorData类如下,包括freeCores、totalCores等信息:
1.           private[cluster] class ExecutorData(
2.            val executorEndpoint: RpcEndpointRef,
3.            val executorAddress: RpcAddress,
4.            override val executorHost: String,
5.            var freeCores: Int,
6.            override val totalCores: Int,
7.            override val logUrlMap: Map[String, String]
8.         ) extendsExecutorInfo(executorHost, totalCores, logUrlMap)
 
makeOffers中首先找到可以利用的activeExecutors ,然后创建workOffers,workOffers 是一个数据结构caseclass,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存是因为之前内存已经分配完成。
1.           private[spark]
2.         case classWorkerOffer(executorId: String, host: String, cores: Int)
 
makeOffers方法中,TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入offers:IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]] 每个任务的数据本地性及放在哪个Executor上执行。
TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffer方法决定。
TaskDescription的源码如下:
1.          private[spark] class TaskDescription(
2.             val taskId: Long,
3.             val attemptNumber: Int,
4.             val executorId: String,
5.             val name: String,
6.             val index: Int,    // Index within this task's TaskSet
7.             _serializedTask: ByteBuffer)
8.           extends Serializable {
 
resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务使得集群的任务运行均衡。resourceOffers的源码如下:
1.          def resourceOffers(offers:IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
2.             // Mark each slave as alive and rememberits hostname
3.             // Also track if new executor is added
4.             var newExecAvail = false
5.             for (o <- offers) {
6.               if (!hostToExecutors.contains(o.host)) {
7.                 hostToExecutors(o.host) = newHashSet[String]()
8.               }
9.               if(!executorIdToRunningTaskIds.contains(o.executorId)) {
10.              hostToExecutors(o.host) += o.executorId
11.              executorAdded(o.executorId, o.host)
12.              executorIdToHost(o.executorId) = o.host
13.              executorIdToRunningTaskIds(o.executorId)= HashSet[Long]()
14.              newExecAvail = true
15.            }
16.            for (rack <- getRackForHost(o.host)) {
17.              hostsByRack.getOrElseUpdate(rack, newHashSet[String]()) += o.host
18.            }
19.          }
20.       
21.          // Randomly shuffle offers to avoid alwaysplacing tasks on the same set of workers.
22.          val shuffledOffers = Random.shuffle(offers)
23.          // Build a list of tasks to assign to eachworker.
24.          val tasks = shuffledOffers.map(o => newArrayBuffer[TaskDescription](o.cores))
25.          val availableCpus = shuffledOffers.map(o=> o.cores).toArray
26.          val sortedTaskSets =rootPool.getSortedTaskSetQueue
27.          for (taskSet <- sortedTaskSets) {
28.            logDebug("parentName: %s, name: %s,runningTasks: %s".format(
29.              taskSet.parent.name, taskSet.name,taskSet.runningTasks))
30.            if (newExecAvail) {
31.              taskSet.executorAdded()
32.            }
33.          }
34.       
35.          // Take each TaskSet in our schedulingorder, and then offer it each node in increasing order
36.          // of locality levels so that it gets achance to launch local tasks on all of them.
37.          // NOTE: the preferredLocality order:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
38.          for (taskSet <- sortedTaskSets) {
39.            var launchedAnyTask = false
40.            varlaunchedTaskAtCurrentMaxLocality = false
41.            for (currentMaxLocality <-taskSet.myLocalityLevels) {
42.              do {
43.                launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
44.                  taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
45.                launchedAnyTask |=launchedTaskAtCurrentMaxLocality
46.              } while(launchedTaskAtCurrentMaxLocality)
47.            }
48.            if (!launchedAnyTask) {
49.              taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
50.            }
51.          }
52.       
53.          if (tasks.size > 0) {
54.            hasLaunchedTask = true
55.          }
56.          return tasks
57.        }
resourceOffers中:
l  标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的executor。感知集群动态资源的状况。
l  offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,那就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。
l  getRackForHost是数据本地性,默认情况下在一个机架Rack里面,生产环境中可能分若干个机架Rack。
l  重要的一行代码 val shuffledOffers = Random.shuffle(offers):是将可用的计算资源打散。
l  tasks将获得洗牌后的shuffledOffers通过map转换,对每一个woker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的Cpu的个数决定。
ArrayBuffer[TaskDescription](o.cores)说明当前ExecutorBackend上面可以分配多少个Task,并行运行多少Task。这里和RDD的分区个数是2个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。
l  TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode 调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。
TaskSchedulerImpl中的initialize中创建rootPool源码:
1.         ……  
2.          var rootPool: Pool = null
3.         ……
4.          
5.         def initialize(backend:SchedulerBackend) {
6.             this.backend = backend
7.             // temporarily set rootPool name to empty
8.             rootPool = new Pool("",schedulingMode, 0, 0)
9.             schedulableBuilder = {
10.            schedulingMode match {
11.              case SchedulingMode.FIFO =>
12.                new FIFOSchedulableBuilder(rootPool)
13.              case SchedulingMode.FAIR =>
14.                new FairSchedulableBuilder(rootPool,conf)
15.              case _ =>
16.                throw newIllegalArgumentException(s"Unsupported spark.scheduler.mode:$schedulingMode")
17.            }
18.          }
 
l  for循环遍历sortedTaskSets,如果有新的可用的executor,通过taskSet.executorAdded()加入taskSet。
TastSetManager的executorAdded方法:
1.            defrecomputeLocality() {
2.             val previousLocalityLevel =myLocalityLevels(currentLocalityIndex)
3.             myLocalityLevels =computeValidLocalityLevels()
4.             localityWaits =myLocalityLevels.map(getLocalityWait)
5.             currentLocalityIndex =getLocalityIndex(previousLocalityLevel)
6.           }
7.          
8.           def executorAdded() {
9.             recomputeLocality()
10.        }
数据本地优先级从高到底依次为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY。其中NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。
l  resourceOffers中追求最高级别的优先级本地性源码如下:
1.          for (taskSet <- sortedTaskSets) {
2.               var launchedAnyTask = false
3.               var launchedTaskAtCurrentMaxLocality =false
4.               for (currentMaxLocality <-taskSet.myLocalityLevels) {
5.                 do {
6.                   launchedTaskAtCurrentMaxLocality =resourceOfferSingleTaskSet(
7.                     taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8.                   launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9.                 } while (launchedTaskAtCurrentMaxLocality)
10.            }
11.            if (!launchedAnyTask) {
12.              taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13.            }
14.          }
 
循环遍历sortedTaskSets,对其中的每一个taskSet,所有的任务首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY循环一遍。myLocalityLevels通过computeValidLocalityLevels方法获取得到。
computeValidLocalityLevels的源码如下:
1.               var myLocalityLevels: Array[TaskLocality]= computeValidLocalityLevels()
2.         ......
3.         private defcomputeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
4.             import TaskLocality.{PROCESS_LOCAL,NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
5.             val levels = newArrayBuffer[TaskLocality.TaskLocality]
6.             if (!pendingTasksForExecutor.isEmpty&& getLocalityWait(PROCESS_LOCAL) != 0 &&
7.                 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))){
8.               levels += PROCESS_LOCAL
9.             }
10.          if (!pendingTasksForHost.isEmpty &&getLocalityWait(NODE_LOCAL) != 0 &&
11.              pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))){
12.            levels += NODE_LOCAL
13.          }
14.          if (!pendingTasksWithNoPrefs.isEmpty) {
15.            levels += NO_PREF
16.          }
17.          if (!pendingTasksForRack.isEmpty &&getLocalityWait(RACK_LOCAL) != 0 &&
18.              pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))){
19.            levels += RACK_LOCAL
20.          }
21.          levels += ANY
22.          logDebug("Valid locality levels for" + taskSet + ": " + levels.mkString(", "))
23.          levels.toArray
24.        }
 
resourceOfferSingleTaskSet的源码如下:
1.               private def resourceOfferSingleTaskSet(
2.               taskSet: TaskSetManager,
3.               maxLocality: TaskLocality,
4.               shuffledOffers: Seq[WorkerOffer],
5.               availableCpus: Array[Int],
6.               tasks:IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
7.             var launchedTask = false
8.             for (i <- 0 until shuffledOffers.size) {
9.               val execId = shuffledOffers(i).executorId
10.            val host = shuffledOffers(i).host
11.            if (availableCpus(i) >= CPUS_PER_TASK){
12.              try {
13.                for (task <-taskSet.resourceOffer(execId, host, maxLocality)) {
14.                  tasks(i) += task
15.                  val tid = task.taskId
16.                  taskIdToTaskSetManager(tid) =taskSet
17.                  taskIdToExecutorId(tid) = execId
18.                  executorIdToRunningTaskIds(execId).add(tid)
19.                  availableCpus(i) -= CPUS_PER_TASK
20.                  assert(availableCpus(i) >= 0)
21.                  launchedTask = true
22.                }
23.              } catch {
24.                case e: TaskNotSerializableException =>
25.                  logError(s"Resource offerfailed, task set ${taskSet.name} was not serializable")
26.                  // Do not offer resources for thistask, but don't throw an error to allow other
27.                  // task sets to be submitted.
28.                  return launchedTask
29.              }
30.            }
31.          }
32.          return launchedTask
33.        }
 
resourceOfferSingleTaskSet方法中CPUS_PER_TASK 是每个Task默认是采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下:
1.               // CPUs to request per task
2.           val CPUS_PER_TASK =conf.getInt("spark.task.cpus", 1)
 
resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level
1.            defresourceOffer(
2.               execId: String,
3.               host: String,
4.               maxLocality: TaskLocality.TaskLocality)
5.             : Option[TaskDescription] =
6.           {
7.         ……
8.         sched.dagScheduler.taskStarted(task,info)
9.             new TaskDescription(taskId = taskId,attemptNumber = attemptNum, execId,
10.                taskName, index, serializedTask)
11.       
以上的内容都是在做一件事情:获取LocalityLevel本地性的层次。DagScheduler告诉了我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task角度考虑计算的本地性,TaskScheduler是更具体的底层调度;本地性的2个层面:1,数据的本地性 2,计算的本地性。
总结一下,scheduler.resourceOffers在其中确定了每个Task具体运行在哪个ExecutorBackend;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?
i.       通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;
ii.      根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组;
iii.     如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;
iv.     通过下述代码追求最高级别的优先级本地性
1.              for (taskSet <- sortedTaskSets) {
2.               var launchedAnyTask = false
3.               var launchedTaskAtCurrentMaxLocality =false
4.               for (currentMaxLocality <-taskSet.myLocalityLevels) {
5.                 do {
6.                   launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
7.                     taskSet, currentMaxLocality,shuffledOffers, availableCpus, tasks)
8.                   launchedAnyTask |=launchedTaskAtCurrentMaxLocality
9.                 } while(launchedTaskAtCurrentMaxLocality)
10.            }
11.            if (!launchedAnyTask) {
12.              taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13.            }
14.          }
 
v.      通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level;
 
回到CoarseGrainedSchedulerBackend.scala的launchTasks方法:
1.          private def launchTasks(tasks:Seq[Seq[TaskDescription]]) {
2.               for (task <- tasks.flatten) {
3.                 val serializedTask =ser.serialize(task)
4.                 if (serializedTask.limit >=maxRpcMessageSize) {
5.                   scheduler.taskIdToTaskSetManager.get(task.taskId).foreach{ taskSetMgr =>
6.                     try {
7.                       var msg = "Serialized task%s:%d was %d bytes, which exceeds max allowed: " +
8.                         "spark.rpc.message.maxSize(%d bytes). Consider increasing " +
9.                         "spark.rpc.message.maxSizeor using broadcast variables for large values."
10.                    msg = msg.format(task.taskId,task.index, serializedTask.limit, maxRpcMessageSize)
11.                    taskSetMgr.abort(msg)
12.                  } catch {
13.                    case e: Exception =>logError("Exception in error callback", e)
14.                  }
15.                }
16.              }
17.              else {
18.                val executorData =executorDataMap(task.executorId)
19.                executorData.freeCores -=scheduler.CPUS_PER_TASK
20.       
21.                logDebug(s"Launching task${task.taskId} on executor id: ${task.executorId} hostname: " +
22.                  s"${executorData.executorHost}.")
23.       
24.                executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))
25.              }
26.            }
27.          }
 
f) 通过launchTasks把任务发送给ExecutorBackend去执行。
launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。
RpcUtils.scala中maxRpcMessageSize定义,spark.rpc.message.maxSize默认设置是128M:
1.              private val maxRpcMessageSize =RpcUtils.maxMessageSizeBytes(conf)
2.         ……
3.           def maxMessageSizeBytes(conf: SparkConf): Int= {
4.             val maxSizeInMB =conf.getInt("spark.rpc.message.maxSize", 128)
5.             if (maxSizeInMB >MAX_MESSAGE_SIZE_IN_MB) {
6.               throw new IllegalArgumentException(
7.                 s"spark.rpc.message.maxSize shouldnot be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
8.             }
9.             maxSizeInMB * 1024 * 1024
10.        }
11.      }
Task进行广播时候的maxSizeInMB大小是128MB,如果任务大于等于128MB的话则Task会直接被丢弃掉;如果小于128MB的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上;CoarseGrainedSchedulerBackend.scala的launchTasks方法:  通过executorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。
接下来,CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。
1.            caseLaunchTask(data) =>
2.               if (executor == null) {
3.                 exitExecutor(1, "ReceivedLaunchTask command but executor was null")
4.               } else {
5.                 val taskDesc =ser.deserialize[TaskDescription](data.value)
6.                 logInfo("Got assigned task "+ taskDesc.taskId)
7.                 executor.launchTask(this, taskId =taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
8.                   taskDesc.name, taskDesc.serializedTask)
9.               }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐