您的位置:首页 > 其它

Master启动的源码详解

2017-06-17 11:30 375 查看

5.1.2 Master启动的源码详解

 
Spark中各个组件是通过脚本来启动部署的,下面以脚本为入口点开始分析Master的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本,停止脚本比较简单,在此仅分析启动脚本。
1.      Master部署的启动脚本解析
首先看下Master的启动脚本./sbin/start-master.sh,内容如下:
1.          # 在脚本的执行节点启动Master组件
2.          # Starts the master on the machine this script is executed on.
3.           
4.          #如果没有设置环境变量SPARK_HOME,会根据脚本所在位置自动设置
5.          if [ -z "${SPARK_HOME}" ]; then
6.            export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
7.          fi
8.           
9.          # NOTE: This exact class name is matched downstream by SparkSubmit.
10.       # Any changes need to be reflected there.
11.       # Master 组件对应的类
12.       CLASS="org.apache.spark.deploy.master.Master"
13.        
14.       #脚本的帮助信息
15.       if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
16.         echo "Usage: ./sbin/start-master.sh [options]"
17.         pattern="Usage:"
18.         pattern+="\|Using Spark's default log4j profile:"
19.         pattern+="\|Registered signal handlers for"
20.        
21.       # 通过脚本spark-class执行指定的Master类,参数为--help
22.         "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
23.         exit 1
24.       fi
25.        
26.       ORIGINAL_ARGS="$@"
27.        
28.       #控制启动Master时,是否同时启动 Tachyon的Master组件
29.       START_TACHYON=false
30.        
31.       while (( "$#" )); do
32.       case $1 in
33.           --with-tachyon)
34.             if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then
35.               echo "Error: --with-tachyon specified, but tachyon not found."
36.               exit -1
37.             fi
38.             START_TACHYON=true
39.             ;;
40.       esac
41.       shift
42.       done
43.        
44.       . "${SPARK_HOME}/sbin/spark-config.sh"
45.        
46.       . "${SPARK_HOME}/bin/load-spark-env.sh"
47.        
48.       #下面的一些参数对应的默认的配置属性
49.       if [ "$SPARK_MASTER_PORT" = "" ]; then
50.         SPARK_MASTER_PORT=7077
51.       fi
52.        
53.       // 用于MasterURL,所以当没有设置时,默认会使用hostname而不是IP地址
54.       // 该MasterURL在Worker注册或应用程序提交时使用
55.       if [ "$SPARK_MASTER_IP" = "" ]; then
56.         SPARK_MASTER_IP=`hostname`
57.       fi
58.        
59.       if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
60.         SPARK_MASTER_WEBUI_PORT=8080
61.       fi
62.        
63.       #通过启动后台进程的脚本spark-daemon.sh来启动Master组件
64.       "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
65.         --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
66.         $ORIGINAL_ARGS
67.        
68.       #需要时同时启动Tachyon,此时Tachyon是编译在Spark内的
69.       if [ "$START_TACHYON" == "true" ]; then
70.         "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP
71.         "${SPARK_HOME}"/tachyon/bin/tachyon format -s
72.         "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master
73.       fi
通过脚本的简单分析,可以看出Master组件是以后台守护进程的方式启动的,对应的后台守护进程的启动脚本spark-daemon.sh,在后台守护进程的启动脚本spark-daemon.sh内部,通过脚本spark-class来启动一个指定主类的JVM进程,相关代码如下所示:
1.          case "$mode" in
2.          #这里对应的是启动一个Spark类
3.              (class)
4.          nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
5.          newpid="$!"
6.                ;;
7.          #这里对应提交一个Spark 应用程序
8.              (submit)
9.          nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null &
10.       newpid="$!"
11.             ;;
12.        
13.           (*)
14.             echo "unknown mode: $mode"
15.             exit 1
16.             ;;
 
通过脚本的分析,可以知道最终执行的是Master类(对应的代码为前面的CLASS="org.apache.spark.deploy.master.Master"),对应的入口点则是Master伴生对象中的main方法,下面以该方法作为入口点进一步解析Master部署框架。
在部署Master组件时,最简单的方式是直接启动脚本,不带任何选项参数,命令如下所示:
1.          ./sbin/start-master.sh
如需设置选项参数,可以查看帮助信息,根据自己的需要进行设置。
2.      Master的源码解析
首先查看Master伴生对象中的main方法。
Master.scala源码:
1.           def main(argStrings: Array[String]) {
2.             Utils.initDaemon(log)
3.             val conf = new SparkConf
4.           // 构建参数解析的实例
5.             val args = new MasterArguments(argStrings,conf)
6.            // 启动RPC通信环境以及Master的RPC通信终端
7.             val (rpcEnv, _, _) =startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
8.             rpcEnv.awaitTermination()
9.           }
 
和其他类,如SparkSubmit一样,Master类的入口点处也包含了对应的参数类MasterArguments, MasterArguments类包括Spark属性配置相关的一些解析。
MasterArguments.scala源码如下:
1.          private[master] class MasterArguments(args:Array[String], conf: SparkConf) extends Logging {
2.           var host = Utils.localHostName()
3.           var port = 7077
4.           var webUiPort = 8080
5.           var propertiesFile: String = null
6.          
7.          // 读取启动脚本中设置的环境变量
8.           if(System.getenv("SPARK_MASTER_IP") != null) {
9.             logWarning("SPARK_MASTER_IP isdeprecated, please use SPARK_MASTER_HOST")
10.          host =System.getenv("SPARK_MASTER_IP")
11.        }
12.       
13.        if(System.getenv("SPARK_MASTER_HOST") != null) {
14.          host =System.getenv("SPARK_MASTER_HOST")
15.        }
16.        if(System.getenv("SPARK_MASTER_PORT") != null) {
17.          port =System.getenv("SPARK_MASTER_PORT").toInt
18.        }
19.        if(System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
20.          webUiPort =System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
21.        }
22.       // 命令行选项参数的解析
23.        parse(args.toList)
24.       
25.        // This mutates the SparkConf, so allaccesses to it must be made after this line
26.        propertiesFile =Utils.loadDefaultSparkProperties(conf, propertiesFile)
27.       
28.        if(conf.contains("spark.master.ui.port")) {
29.          webUiPort =conf.get("spark.master.ui.port").toInt
30.        }
31.      ......
 
MasterArguments中的printUsageAndExit方法,对应的就是命令行中的帮助信息。
MasterArguments.scala源码:
1.           private defprintUsageAndExit(exitCode: Int) {
2.             // scalastyle:off println
3.             System.err.println(
4.               "Usage: Master [options]\n" +
5.               "\n" +
6.               "Options:\n" +
7.               " -i HOST, --ip HOST     Hostname tolisten on (deprecated, please use --host or -h) \n" +
8.               " -h HOST, --host HOST   Hostname tolisten on\n" +
9.               " -p PORT, --port PORT   Port tolisten on (default: 7077)\n" +
10.            " --webui-port PORT      Port forweb UI (default: 8080)\n" +
11.            " --properties-file FILE Path to a custom Spark properties file.\n" +
12.            "                         Default isconf/spark-defaults.conf.")
13.          // scalastyle:on println
14.          System.exit(exitCode)
15.        }
 
在解析完Master的参数之后,调用startRpcEnvAndEndpoin方法启动RPC通信环境以及Master的RPC通信终端。
Master.scala的startRpcEnvAndEndpoint源码如下:
1.           
2.          /**
3.            * 启动Master并返回一个三元组:
4.            *  (1) The Master RpcEnv
5.            *  (2) The web UI bound port
6.            *  (3) The REST server bound port, if any
7.            */
8.          
9.         def startRpcEnvAndEndpoint(
10.            host: String,
11.            port: Int,
12.            webUiPort: Int,
13.            conf: SparkConf): (RpcEnv, Int,Option[Int]) = {
14.          val securityMgr = new SecurityManager(conf)
15.         // 构建RPC通信环境
16.          val rpcEnv = RpcEnv.create(SYSTEM_NAME,host, port, conf, securityMgr)
17.         //构建RPC通信终端,实例化Master
18.          val masterEndpoint =rpcEnv.setupEndpoint(ENDPOINT_NAME,
19.            new Master(rpcEnv, rpcEnv.address,webUiPort, securityMgr, conf))
20.       
21.       // 向Master的通信终端发送请求,获取绑定的端口号
22.       // 包含Master的web ui监听端口号和REST的监听端口号
23.       
24.          val portsResponse =masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
25.          (rpcEnv, portsResponse.webUIPort,portsResponse.restPort)
26.        }
27.      }
 
startRpcEnvAndEndpoint方法中定义的ENDPOINT_NAME如下:
Master.scala源码:
1.           private[deploy] object Master extends Logging{
2.           val SYSTEM_NAME = "sparkMaster"
3.           val ENDPOINT_NAME = "Master"
4.         …….
 
startRpcEnvAndEndpoint方法中通过masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)给Master自己本身发送一个消息BoundPortsRequest,是一个case object。发送消息BoundPortsRequest给自己确保masterEndpoint正常启动起来。返回消息的类型是BoundPortsResponse,是一个case class。
MasterMessages.scala源码:
1.          private[master] object MasterMessages {
2.         ……
3.           case object BoundPortsRequest
4.           case classBoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])
5.         }
 
Master收到消息BoundPortsRequest,发送返回消息BoundPortsResponse。
Master.scala源码:
1.          override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {
2.         ......
3.         case BoundPortsRequest =>
4.               context.reply(BoundPortsResponse(address.port,webUi.boundPort, restServerBoundPort))
 
在BoundPortsResponse传入的参数restServerBoundPort是在Master的onStart方法中定义的:
Master.scala源码:
1.          ......
2.           private var restServerBoundPort: Option[Int]= None
3.          
4.           override def onStart(): Unit = {
5.         ......
6.            restServerBoundPort =restServer.map(_.start())
7.         …….
而在restServerBoundPort是通过restServer进行map操作启动赋值,看一下restServer。
Master.scala源码:
1.           private var restServer:Option[StandaloneRestServer] = None
2.         ......
3.            if (restServerEnabled) {
4.               val port =conf.getInt("spark.master.rest.port", 6066)
5.               restServer = Some(newStandaloneRestServer(address.host, port, conf, self, masterUrl))
6.             }
7.         ......
 
其中就new出来一个StandaloneRestServer。StandaloneRestServer服务器响应请求提交的[[RestSubmissionClient]]。将被嵌入到standalone Master中,仅用于集群模式。服务器根据不同的情况使用不同的HTTP代码进行响应:
l  200 OK -请求已成功处理
l  400错误请求:请求格式错误,未成功验证,或意外类型。
l  468未知协议版本:请求指定了此服务器不支持的协议。
l  500内部服务器错误:服务器在处理请求时引发内部异常
服务器在HTTP主体中总包含一个JSON表示的[[SubmitRestProtocolResponse]]。如果发生错误,服务器将包括一个[[ErrorResponse]]。如果构造了这个错误响应内部失败时,响应将由一个空体组成,响应体指示内部服务器错误。
StandaloneRestServer.scala源码:
1.          private[deploy] class StandaloneRestServer(
2.             host: String,
3.             requestedPort: Int,
4.             masterConf: SparkConf,
5.             masterEndpoint: RpcEndpointRef,
6.             masterUrl: String)
7.           extends RestSubmissionServer(host,requestedPort, masterConf) {
 
我们看一下RestSubmissionClient客户端。客户端提交申请[[RestSubmissionServer]]。在协议版本V1中, REST URL以表单形式出现http://[host:port]/v1/submissions/[action], [action]可以是create、kill、或状态的其中之一。每种请求类型都表示为发送到以下前缀的HTTP消息:
   (1) submit - POST to/submissions/create
   (2) kill - POST/submissions/kill/[submissionId]
   (3) status - GET/submissions/status/[submissionId]
在(1)的情况下,参数以JSON字段的形式发布到HTTP主体中。否则URL指定按客户端的预期操作。由于该协议预计将在Spark版本中保持稳定,因此现有字段不能添加或删除,但可以添加新的可选字段。如在少见的事件中向前或向后兼容性被破坏,Spark须引入一个新的协议版本(如V2)。客户机和服务器必须使用协议的同一版本进行通信。如果不匹配,服务器将用它支持的最高协议版本进行响应。此客户机的实现可以使用指定的版本使用该信息重试。
RestSubmissionClient.scala
1.          private[spark] classRestSubmissionClient(master: String) extends Logging {
2.           import RestSubmissionClient._
3.           private val supportedMasterPrefixes =Seq("spark://", "mesos://")
4.         ……
Restful把一切都看成是资源。利用Restful API可以对Spark进行监控。程序运行的每一个步骤、Task的计算步骤都可以可视化,对Spark的运行进行详细的监控。
 
 回到startRpcEnvAndEndpoint方法中,new创建了一个Master实例。Master实例化的时候会对所有的成员进行初始化。如默认的Cores个数等。  
Master.scala源码如下:
1.          private[deploy] class Master(
2.             override val rpcEnv: RpcEnv,
3.             address: RpcAddress,
4.             webUiPort: Int,
5.             val securityMgr: SecurityManager,
6.             val conf: SparkConf)
7.           extends ThreadSafeRpcEndpoint with Loggingwith LeaderElectable {
8.         …….
9.           // Default maxCores forapplications that don't specify it (i.e. pass Int.MaxValue)
10.        private val defaultCores =conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
11.        val reverseProxy =conf.getBoolean("spark.ui.reverseProxy", false)
12.        if (defaultCores < 1) {
13.          throw newSparkException("spark.deploy.defaultCores must be positive")
14.        }
15.      ……
16.       
 
 Master继承了ThreadSafeRpcEndpoint和LeaderElectable,其中继承LeaderElectable涉及到Master的HA(High Availability ,高可用性)机制。这里先关注ThreadSafeRpcEndpoint,继承该类后,Master作为一个RpcEndpoint,实例化后首先会调用onStart方法。
Master.scala源码:
1.           override def onStart(): Unit = {
2.             logInfo("Starting Spark master at" + masterUrl)
3.             logInfo(s"Running Spark version${org.apache.spark.SPARK_VERSION}")
4.             // 构建一个Master的 web ui, 查看向Master提交的应用程序等信息
5.             webUi = new MasterWebUI(this, webUiPort)
6.             webUi.bind()
7.             masterWebUiUrl = "http://" +masterPublicAddress + ":" + webUi.boundPort
8.             if (reverseProxy) {
9.               masterWebUiUrl =conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
10.            logInfo(s"Spark Master is acting asa reverse proxy. Master, Workers and " +
11.             s"Applications UIs are available at$masterWebUiUrl")
12.          }
13.        // 在一个守护线程中,启动调度机制,周期性检查Worker是否超时,当Worker节点超时后,会修改其状态或从Master中移除及其相关的操作
14.       
15.          checkForWorkerTimeOutTask =forwardMessageThread.scheduleAtFixedRate(new Runnable {
16.            override def run(): Unit =Utils.tryLogNonFatalError {
17.              self.send(CheckForWorkerTimeOut)
18.            }
19.          }, 0, WORKER_TIMEOUT_MS,TimeUnit.MILLISECONDS)
20.       
21.        // 默认情况下会启动Rest服务,可以通过该服务向Master提交各种请求
22.          if (restServerEnabled) {
23.            val port =conf.getInt("spark.master.rest.port", 6066)
24.            restServer = Some(newStandaloneRestServer(address.host, port, conf, self, masterUrl))
25.          }
26.          restServerBoundPort =restServer.map(_.start())
27.        // 度量(Metroics)相关的操作,用于监控
28.          masterMetricsSystem.registerSource(masterSource)
29.          masterMetricsSystem.start()
30.          applicationMetricsSystem.start()
31.         // 在度量系统启动后,将主程序和应用程序度量handler处理程序附加到Web UI中
32.          masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
33.          applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
34.         //Master HA相关的操作
35.          val serializer = new JavaSerializer(conf)
36.          val (persistenceEngine_,leaderElectionAgent_) = RECOVERY_MODE match {
37.            case "ZOOKEEPER" =>
38.              logInfo("Persisting recovery stateto ZooKeeper")
39.              val zkFactory =
40.                newZooKeeperRecoveryModeFactory(conf, serializer)
41.              (zkFactory.createPersistenceEngine(),zkFactory.createLeaderElectionAgent(this))
42.            case "FILESYSTEM" =>
43.              val fsFactory =
44.                newFileSystemRecoveryModeFactory(conf, serializer)
45.              (fsFactory.createPersistenceEngine(),fsFactory.createLeaderElectionAgent(this))
46.            case "CUSTOM" =>
47.              val clazz =Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
48.              val factory =clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
49.                .newInstance(conf, serializer)
50.                .asInstanceOf[StandaloneRecoveryModeFactory]
51.              (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
52.            case _ =>
53.              (new BlackHolePersistenceEngine(), newMonarchyLeaderAgent(this))
54.          }
55.          persistenceEngine = persistenceEngine_
56.          leaderElectionAgent = leaderElectionAgent_
57.        }
 
其中在Master的onStart()方法中new出来MasterWebUI,启动一个webServder。
 Master.scala源码:
1.           override def onStart(): Unit = {
2.         ……  
3.          webUi = new MasterWebUI(this, webUiPort)
4.             webUi.bind()
5.             masterWebUiUrl = "http://" +masterPublicAddress + ":" + webUi.boundPort
 
如MasterWebUI 的spark.ui.killEnabled设置为true,可以通过WebUI页面可以把Spark的进程kill掉。
MasterWebUI.scala的源码:
1.            private[master]
2.         class MasterWebUI(
3.             val master: Master,
4.             requestedPort: Int)
5.           extends WebUI(master.securityMgr,master.securityMgr.getSSLOptions("standalone"),
6.             requestedPort, master.conf, name ="MasterUI") with Logging {
7.           val masterEndpointRef = master.self
8.           val killEnabled =master.conf.getBoolean("spark.ui.killEnabled", true)
9.         …….
10.      initialize()
11.       
12.        /** Initialize all components of the server.*/
13.        def initialize() {
14.          val masterPage = new MasterPage(this)
15.          attachPage(new ApplicationPage(this))
16.          attachPage(masterPage)
17.          attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR,"/static"))
18.          attachHandler(createRedirectHandler(
19.            "/app/kill", "/",masterPage.handleAppKillRequest, httpMethods = Set("POST")))
20.          attachHandler(createRedirectHandler(
21.            "/driver/kill", "/",masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
22.        }
 
MasterWebUI中在初始化的时候new出来MasterPage,在MasterPage中通过代码去写Web页面。
MasterPage.scala源码如下:
1.          private[ui] class MasterPage(parent:MasterWebUI) extends WebUIPage("") {
2.          ......
3.           override def renderJson(request:HttpServletRequest): JValue = {
4.             JsonProtocol.writeMasterState(getMasterState)
5.           }
6.         .....
7.           val content =
8.                 <div class="row-fluid">
9.                   <div class="span12">
10.                  <ulclass="unstyled">
11.                    <li><strong>URL:</strong>{state.uri}</li>
12.                    {
13.                      state.restUri.map { uri =>
14.                        <li>
15.                          <strong>RESTURL:</strong> {uri}
16.                          <spanclass="rest-uri"> (cluster mode)</span>
17.                        </li>
18.                      }.getOrElse { Seq.empty }
19.                    }
20.                    <li><strong>AliveWorkers:</strong> {aliveWorkers.length}</li>
21.                    <li><strong>Cores inuse:</strong> {aliveWorkers.map(_.cores).sum} Total,
22.                      {aliveWorkers.map(_.coresUsed).sum}Used</li>
23.                    <li><strong>Memory inuse:</strong>
24.                      {Utils.megabytesToString(aliveWorkers.map(_.memory).sum)}Total,
25.                      {Utils.megabytesToString(aliveWorkers.map(_.memoryUsed).sum)}Used</li>
26.                    <li><strong>Applications:</strong>
27.                      {state.activeApps.length} <ahref="#running-app">Running</a>,
28.                      {state.completedApps.length}<a href="#completed-app">Completed</a> </li>
29.                    <li><strong>Drivers:</strong>
30.                      {state.activeDrivers.length}Running,
31.                      {state.completedDrivers.length}Completed </li>
32.                    <li><strong>Status:</strong>{state.status}</li>
33.                  </ul>
34.                </div>
35.              </div>
36.      ........
 
回到MasterWebUI.scala的initialize()方法,其中调用了attachPage方法,在WebUI中增加Web页面。
WebUI.scala源码:
1.             def attachPage(page: WebUIPage) {
2.             val pagePath = "/" + page.prefix
3.             val renderHandler =createServletHandler(pagePath,
4.               (request: HttpServletRequest) =>page.render(request), securityManager, conf, basePath)
5.             val renderJsonHandler =createServletHandler(pagePath.stripSuffix("/") + "/json",
6.               (request: HttpServletRequest) =>page.renderJson(request), securityManager, conf, basePath)
7.             attachHandler(renderHandler)
8.             attachHandler(renderJsonHandler)
9.             val handlers =pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
10.          handlers += renderHandler
11.        }
 
在WebUI的bind方法中启用了JettyServer。
WebUI.scala的bind源码:
1.           def bind() {
2.             assert(!serverInfo.isDefined,s"Attempted to bind $className more than once!")
3.             try {
4.               val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
5.               serverInfo = Some(startJettyServer(host,port, sslOptions, handlers, conf, name))
6.               logInfo(s"Bound $className to $host,and started at $webUrl")
7.             } catch {
8.               case e: Exception =>
9.                 logError(s"Failed to bind$className", e)
10.              System.exit(1)
11.          }
12.        }
 
JettyUtils .scala的startJettyServer尝试Jetty服务器绑定到所提供的主机名、端口。
startJettyServer源码如下:
1.         def startJettyServer(
2.               hostName: String,
3.               port: Int,
4.               sslOptions: SSLOptions,
5.               handlers: Seq[ServletContextHandler],
6.               conf: SparkConf,
7.               serverName: String = ""):ServerInfo = { 
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: