Spark的standalone源码分析(二)
2012-12-31 17:01
295 查看
本文主要描述Spark的standalone模式启动时候,master和work的状态transfer,并简要分析相关的代码;先上一幅状态图
1. 解析参数,获得master-ip,master-port等必要参数;
2. 创建MasterActor,每一个Actor实例初始化的时候会执行preStart方法,Master对象的preStart方法会开启webui,并且为了监听connect到master的work节点的状态,MasterActor订阅监听RemoteClientLifeCycleEvent,监听所有的outbound-related events;当worker节点出现disconnected或者shutdown的时候,receive中接受到这样的event,会remove该work节点,以及在该work节点上提交的Job;
1. 判断masterURL是否合法;
2. 生成MasterActor的ActorRef,并向MasterActor提交RegisterWorker的消息,MasterActor接到消息之后会将该Worker节点加入自己维护的一个works的hashset;
当以plane node为主的话,outbound的都是client event,inbound的都是server event;对于Airport node亦然;
1. Master启动
"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT
--webui-port $SPARK_MASTER_WEBUI_PORT由上述启动脚本可见,Master的启动主函数位于spark.deploy.master.Master:
private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) val actor = actorSystem.actorOf( Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master") actorSystem.awaitTermination() } }首先:
1. 解析参数,获得master-ip,master-port等必要参数;
2. 创建MasterActor,每一个Actor实例初始化的时候会执行preStart方法,Master对象的preStart方法会开启webui,并且为了监听connect到master的work节点的状态,MasterActor订阅监听RemoteClientLifeCycleEvent,监听所有的outbound-related events;当worker节点出现disconnected或者shutdown的时候,receive中接受到这样的event,会remove该work节点,以及在该work节点上提交的Job;
override def preStart() { logInfo("Starting Spark master at spark://" + ip + ":" + port) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) startWebUi() }
case RemoteClientDisconnected(transport, address) => { // The disconnected client could've been either a worker or a job; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToJob.get(address).foreach(removeJob) } case RemoteClientShutdown(transport, address) => { // The disconnected client could've been either a worker or a job; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToJob.get(address).foreach(removeJob) }
2. Worker的启动
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1Work启动的主函数位于spark.deploy.worker.Worker,其会初始化WorkActor,在connect to master的方法中执行如下操作:
1. 判断masterURL是否合法;
2. 生成MasterActor的ActorRef,并向MasterActor提交RegisterWorker的消息,MasterActor接到消息之后会将该Worker节点加入自己维护的一个works的hashset;
def connectToMaster() { masterUrl match { case MASTER_REGEX(masterHost, masterPort) => { logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { case e: Exception => logError("Failed to connect to master", e) System.exit(1) } } case _ => logError("Invalid master URL: " + masterUrl) System.exit(1) } }3. 在提交注册消息以后,与MasterActor一样,WorkerActor也订阅监听RemoteClientLifeCycleEvent,并在receive方法里面,对相应的event调用masterDisconnected,kill掉该节点的executors,并退出:
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected()
补充,AKKA的Remote System Events
在AKKA Concurrency一书中,对AKKA的remote node做了描述,即在AKKA中,节点与节点之间是peer的概念,对于彼此来说,它们既是clients又是servers,如图所示:当以plane node为主的话,outbound的都是client event,inbound的都是server event;对于Airport node亦然;
相关文章推荐
- Standalone模式下Spark 中通信机制的源码分析
- Spark的standalone源码分析(三)
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- Spark的standalone源码分析(一)
- 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- Spark的standalone源码分析(四)
- Apache Spark源码走读之7 -- Standalone部署方式分析
- Spark的standalone源码分析(五)
- 源码-spark Standalone部署模式及其容错性分析
- Apache Spark源码走读之7 -- Standalone部署方式分析
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- Spark2.2 Task原理分析及源码解析
- spark-streaming-kafka-0-10源码分析
- 结合源码分析Spark中的Accuracy(准确率), Precision(精确率), 和F1-Measure
- spark源码分析之sparkcontext原理篇