您的位置:首页 > 其它

Spark源码学习(9)——Spark On Yarn

2016-06-20 13:13 357 查看

本文要解决的问题:

了解Spark在Yarn平台上的运行过程。

修改配置文件

首先需要修改配置文件spark-env.sh。在这个文件中需要添加两个属性:

Export HADOOP_HOME=/../hadoop..

ExportHADOOP_CONF_DIR=/../hadoop/etc/hadoop

这里,一个是要hadoop的home目录。一个是配置文件目录。

还需要配置一个就是spark-defaults.conf这个文件:



需要修改红色框内的文件。下面看下这个文件里面的内容:



在spark的源文件中给出了一些配置参数的示例。另外它还下面一个可以配置的属性

属性名说明默认值
spark.yarn.applicationMaster.waitTriesRM等待Spark AppMaster启动次数,也就是SparkContext初始化次数。超过这个数值,启动失败。10
spark.yarn.submit.file.replication应用程序上载到HDFS的文件的复制因子3
spark.yarn.preserve.staging.files设置为true,在job结束后,将stage相关的文件保留而不是删除。false
spark.yarn.scheduler.heartbeat.interval-msSpark AppMaster发送心跳信息给YARN RM的时间间隔5000
spark.yarn.max.executor.failures导致应用程序宣告失败的最大executor失败数2倍于executor数
spark.yarn.historyServer.addressSpark history server的地址(不要加http://)。这个地址会在应用程序完成后提交给YARN RM,使得将信息从RM UI连接到history server UI上。

运行流程

下面是Spark On Yarn的流程图:



下面再来看看具体的源码:

Client

在Client类中的main方法实例话Client:new Client(args, sparkConf).run()。在run方法中,又调用了val appId = runApp()方法。runApp()源码如下

def runApp() = {

validateArgs()

init(yarnConf)

start()

logClusterResourceDetails()

val newApp = super.getNewApplication()

val appId = newApp.getApplicationId()

verifyClusterResources(newApp)

val appContext = createApplicationSubmissionContext(appId)

val appStagingDir = getAppStagingDir(appId)

val localResources = prepareLocalResources(appStagingDir)

val env = setupLaunchEnv(localResources, appStagingDir)

val amContainer = createContainerLaunchContext(newApp, localResources, env)

val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]

// Memory for the ApplicationMaster.

capability.setMemory(args.amMemory + memoryOverhead)

amContainer.setResource(capability)

appContext.setQueue(args.amQueue)

appContext.setAMContainerSpec(amContainer)

appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())

submitApp(appContext)

appId

}


1)这里首先对一些参数配置的校验,然后初始化、启动Client

2)提交请求到ResouceManager,检查集群的内存情况。

3)设置一些参数,请求队列

4)正式提交APP

ApplicationManager

AM负责运行Spark Application的Driver程序,并分配执行需要的Executors。里面也有个main方法实例化AM并调用run,源码如下:

final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()

if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")

// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
System.setProperty("spark.submit.deployMode", "cluster")

// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}

logInfo("ApplicationAttemptId: " + appAttemptId)

val fs = FileSystem.get(yarnConf)

// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts

if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}

if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(fs)
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: