您的位置:首页 > 其它

Spark2.0.X源码深度剖析之 Spark Submit..

2017-06-09 20:44 399 查看
微信号:519292115

邮箱:taosiyuan163@163.com

尊重原创,禁止转载!!

Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等多元化操作,阅读源码有助你加深对框架的理解和认知

本人将依次剖析Spark2.0.0.X版本的各个核心组件,包括以后章节的SparkContext,SparkEnv,RpcEnv,NettyRpc,BlockManager,OutputTracker,TaskScheduler,DAGScheduler等

Spark Submit 脚本是客户端提交任务时的入口脚本,里面包含了集群的提交模式,并行个数,core个数等,下面是他所触发的核心代码

在我们提交Submit脚本的时候 shell脚本最终会触发如下命令,这也就进入了Spark的入口函数\spark\spark-master\spark-master\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scala

exec"$SPARK_HOME"/bin/spark-class
org.apache.spark.deploy.SparkSubmit "$@"

根据不同的参数触发不同的执行任务

override def main(args: Array[String]): Unit = {
//拿到submit脚本传入的参数
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
//根据传入的参数匹配对应的执行方法
appArgs.action match {
//根据传入参数提交命令
case SparkSubmitAction.SUBMIT => submit(appArgs)
//只有standlone和mesos集群模式才能触发
case SparkSubmitAction.KILL => kill(appArgs)
//只有standlone和mesos集群模式才能触发
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

submit十分关键,主要分为两步骤:

1,调用prepareSubmitEnvironment:

prepareSubmitEnvironment(args)这个核心方法会根据你传入的参数匹配不同的Spark运行

模式的入口类,这里会生成系统参数,相关Classpath和运行模式等用作稍后Spark的运行环境准备

2. 调用doRunMain:

这步就是根据之前拿到的Spark运行环境的4元祖然后调用对应的相关入口函数,主要也就是调用对应模式的相关main方法

/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
//核心prepareSubmitEnvironment拿到args参数并匹配对应模式返回4元祖
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
//根据拿到的核心信息调用runMain
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})


prepareSubmitEnvironment方法:根据args提取到不同的参数 触发不同的deploy模式

if (deployMode == CLIENT || isYarnCluster) {
childMainClass = args.mainClass

if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"


if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"


if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"


最后返回的四元组会被传入doRunMain里的run方法,而在runMain里面主要是通过Java的反射动态获取到mainClassPath的真正的main入口函数

var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>


里面是个带类加载器参数的Class.forName,其实源码里面很多地方都有用Java代码来实现,包括使用反射来动态获取对象,包括在生成

SparkEnv里的Serializer(默认是JavaSerializer,当然你也可以用高效的Kryo,在后面的章节会讲到)以及一些核心组件的线程池,数据存储结构等

/** Preferred alternative to Class.forName(className) */
def classForName(className: String): Class[_] = {
//最后还是通过java反射机制实现
Class.forName(className, true, getContextOrSparkClassLoader)
// scalastyle:on classforname
}


最终调用对应的main函数

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

try {

mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: