Spark源码学习(9)——Spark On Yarn
2016-06-20 13:13
357 查看
本文要解决的问题:
了解Spark在Yarn平台上的运行过程。修改配置文件
首先需要修改配置文件spark-env.sh。在这个文件中需要添加两个属性:Export HADOOP_HOME=/../hadoop..
ExportHADOOP_CONF_DIR=/../hadoop/etc/hadoop
这里,一个是要hadoop的home目录。一个是配置文件目录。
还需要配置一个就是spark-defaults.conf这个文件:
需要修改红色框内的文件。下面看下这个文件里面的内容:
在spark的源文件中给出了一些配置参数的示例。另外它还下面一个可以配置的属性
属性名 | 说明 | 默认值 |
---|---|---|
spark.yarn.applicationMaster.waitTries | RM等待Spark AppMaster启动次数,也就是SparkContext初始化次数。超过这个数值,启动失败。 | 10 |
spark.yarn.submit.file.replication | 应用程序上载到HDFS的文件的复制因子 | 3 |
spark.yarn.preserve.staging.files | 设置为true,在job结束后,将stage相关的文件保留而不是删除。 | false |
spark.yarn.scheduler.heartbeat.interval-ms | Spark AppMaster发送心跳信息给YARN RM的时间间隔 | 5000 |
spark.yarn.max.executor.failures | 导致应用程序宣告失败的最大executor失败数 | 2倍于executor数 |
spark.yarn.historyServer.address | Spark history server的地址(不要加http://)。这个地址会在应用程序完成后提交给YARN RM,使得将信息从RM UI连接到history server UI上。 | 无 |
运行流程
下面是Spark On Yarn的流程图:下面再来看看具体的源码:
Client
在Client类中的main方法实例话Client:new Client(args, sparkConf).run()。在run方法中,又调用了val appId = runApp()方法。runApp()源码如下def runApp() = { validateArgs() init(yarnConf) start() logClusterResourceDetails() val newApp = super.getNewApplication() val appId = newApp.getApplicationId() verifyClusterResources(newApp) val appContext = createApplicationSubmissionContext(appId) val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) val env = setupLaunchEnv(localResources, appStagingDir) val amContainer = createContainerLaunchContext(newApp, localResources, env) val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. capability.setMemory(args.amMemory + memoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) submitApp(appContext) appId }
1)这里首先对一些参数配置的校验,然后初始化、启动Client
2)提交请求到ResouceManager,检查集群的内存情况。
3)设置一些参数,请求队列
4)正式提交APP
ApplicationManager
AM负责运行Spark Application的Driver程序,并分配执行需要的Executors。里面也有个main方法实例化AM并调用run,源码如下:final def run(): Int = { try { val appAttemptId = client.getAttemptId() if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") // Set the master and deploy mode property to match the requested mode. System.setProperty("spark.master", "yarn") System.setProperty("spark.submit.deployMode", "cluster") // Set this internal configuration if it is running on cluster mode, this // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } logInfo("ApplicationAttemptId: " + appAttemptId) val fs = FileSystem.get(yarnConf) // This shutdown hook should run *after* the SparkContext is shut down. val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { // The default state of ApplicationMaster is failed if it is invoked by shut down hook. // This behavior is different compared to 1.x version. // If user application is exited ahead of time by calling System.exit(N), here mark // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call // System.exit(0) to terminate the application. finish(finalStatus, ApplicationMaster.EXIT_EARLY, "Shutdown hook called before final status was reported.") } if (!unregistered) { // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) cleanupStagingDir(fs) } } }
相关文章推荐
- 【转】Java工具类——资源文件解析类PropertiesUtil
- 【转】Java工具类——资源文件解析类PropertiesUtil
- BZOJ4585 [Apio2016]烟火表演
- APP登录----伪sessionId设计登录
- oracle 数据库创建
- MATLAB 神经网络基础(5)
- button出现投影
- Java工具类——字符串压缩StringCompress
- glViewport()函数和glOrtho()函数的理解(转)
- 永远不要使用 Boolean 对象
- 触发器 insert instead of
- Python判断列表是否已排序的各种方法及其性能分析
- OpenGl 坐标转换
- 玻璃旅行
- Java工具类——发送GET/POST请求工具
- LINK : fatal error LNK1201: 写入程序数据库
- Linux 打包和压缩 方法详解
- ArcGIS Engine加载ArcGIS Server服务
- OpenGL 的渲染流水线
- apache2.4版本服务器在本机上配置虚拟站点