Spark源码学习(2)——Spark Submit
2016-06-11 21:27
447 查看
本文要解决的问题:
通过查看Spark作业提交的源码,对其作业提交过程有更深的理解。
![](http://img.blog.csdn.net/20160611203125042)
submit是一个单独的进程,首先查看它的main方法:
作业提交调用了submit()方法,该方法的源码如下:
最后没什么问题的话,会执行
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)方法,该方法中设置了一些配置参数:如集群模式、运行环境等。这里主要查看Client的集群模式。下面看下作业提交序列图:
![](http://img.blog.csdn.net/20160611204325625)
Client是一个actor,Client提交任务,首先需要封装好DriverDescription参数。包括jar文件url、momory、cpu 、cores等。然后向Master发送RequestSubmitDriver消息。
这里主要看下schedule这个方法:
上面源码中。主要看;两个方法launchDriver、launchExecutor
launchDriver
launchDriver:是让worker来启动driver
launchExecutor
LaunchDriver启动driver
这里启动了driver。而它在启动的时候 就是创建目录然后下载jar包然后记载一些参数,最后向work发送worker !DriverStateChanged(driverId, state, finalException)。Worker接收到DriverStateChanged后将消息发给Master。最后Master接收到这个消息,则移除driver。
LaunchExecutor
Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
这里真正的执行方法在ExecutorRunner中的fetchAndRunExecutor方法中。
接下来简要总结一下作业提交的流程。
1)客户端启动后直接运行用户程序,启动Driver相关的工作:DAGScheduler和BlockManagerMaster等。
2)客户端的Driver向Master注册。
3)Master会让Worker启动Exeuctor。
4)Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。 ExecutorBackend启动后会向Driver的SchedulerBackend注册。
5)Driver的DAGScheduler解析作业并生成相应的Stage,每个Stage包含的Task通过TaskScheduler分配给Executor执行。 所有stage都完成后作业结束。
通过查看Spark作业提交的源码,对其作业提交过程有更深的理解。
作业提交的基本流程
首先需要找到Spark submit过程的源码。在工程路径的spark.deploy.SparkSubmit下,submit是一个单独的进程,首先查看它的main方法:
def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
作业提交调用了submit()方法,该方法的源码如下:
private def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { // scalastyle:off println printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") // scalastyle:on println exitFn(1) } else { throw e } } } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }
最后没什么问题的话,会执行
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)方法,该方法中设置了一些配置参数:如集群模式、运行环境等。这里主要查看Client的集群模式。下面看下作业提交序列图:
Client
Client的启动方法onStart。override def onStart(): Unit = { driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) ayncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) case "kill" => val driverId = driverArgs.driverId ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } }
Client是一个actor,Client提交任务,首先需要封装好DriverDescription参数。包括jar文件url、momory、cpu 、cores等。然后向Master发送RequestSubmitDriver消息。
Master
Master中接收RequestSubmitDriver消息的处理case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) }
这里主要看下schedule这个方法:
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() }
上面源码中。主要看;两个方法launchDriver、launchExecutor
launchDriver
launchDriver:是让worker来启动driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
launchExecutor
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
Worker
Master向Worker发送了LaunchDriver和LaunchExecutor。这里在就跟踪Worker下怎么处理Master发送的这两个消息。LaunchDriver启动driver
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem
这里启动了driver。而它在启动的时候 就是创建目录然后下载jar包然后记载一些参数,最后向work发送worker !DriverStateChanged(driverId, state, finalException)。Worker接收到DriverStateChanged后将消息发给Master。最后Master接收到这个消息,则移除driver。
LaunchExecutor
Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq) appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } }
这里真正的执行方法在ExecutorRunner中的fetchAndRunExecutor方法中。
接下来简要总结一下作业提交的流程。
1)客户端启动后直接运行用户程序,启动Driver相关的工作:DAGScheduler和BlockManagerMaster等。
2)客户端的Driver向Master注册。
3)Master会让Worker启动Exeuctor。
4)Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。 ExecutorBackend启动后会向Driver的SchedulerBackend注册。
5)Driver的DAGScheduler解析作业并生成相应的Stage,每个Stage包含的Task通过TaskScheduler分配给Executor执行。 所有stage都完成后作业结束。
相关文章推荐
- 网络连接(一)
- 百度网盘连接失效
- SDI延迟
- 7_1 除法(UVa725)<选择合适的枚举对象>
- python去掉首行的/n
- 深入理解Java的接口和抽象类
- 从网易与淘宝的font-size思考前端设计稿与工作流
- SQL SERVER datetime 操作
- 【Unity3D插件】NGUI基础学习笔记
- UIWebView的使用
- SelectObject 函数详解
- leetcode 300. Longest Increasing Subsequence
- Android单元测试
- Listview的优化与复用
- 同学最擅长的开辟数组空间
- 开发App常见的九大错误
- 反射
- JavaScript鼠标拖动
- 冒泡排序法与快速排序法举例
- AS Bug:Error:failed to find Build Tools revision 24.0.0 rc1