Spark的standalone源码分析(三)
2013-01-17 19:33
405 查看
本文描述SparkContext实例初始化的过程中,spark后台启动的一系列的服务,以及它们之间的交互。
从spark源码中的comments中可以看到,sparkContext类是spark运行的主要集成类,它负责与spark集群的connection,并且负责数据的生成和计算以及其中的task的调度;在Hadoop系统中,集群的调度有两大类:Job调度和Task调度。Spark也类似,如上节所述,Job的调度有master节点的schedule()负责,Task的调度则有sparkcontext实例初始化时开启的ClusterScheduler负责(实际上,可以视为clusterSchedule为一个插件箱,里面可以配置各种各样的backend服务);下面分别介绍一个sparkcontext初始化中的服务;
初始化实例的时候,必须要指定master节点的URL,
SparkEnv.createFromSystemProperties会依次在master节点,开启如下服务,在scala中其实就是actor:BlockManager,BroadcastManager,CacheTracker(to be removed instead of blockmanager),MapOutputTracker,ShuffleFetcher,HttpFileServer;下面对这些服务做简要的介绍。
首先初始化的时候,各个节点的BlockManager都会初始化一个BlockManagerMaster实例:在master节点,会创建BlockManagerMasterActor,如果是在work节点,则masterActor将初始化为BlockManagerMasterActor的actor引用;
每个节点的BlockManager实例化时,首先向master节点masterActor注册blockManagerID,每一个节点的BlockManager信息,会存于master节点的blockManagerInfo;其次,每个节点会初始化一个BlockManagerWorker实例,它定义了集群中master节点和worker节点之间数据传输的网络接口(network
interface),其中由接口可以看到集群中的block数据的操作主要有两种操作:put和get;BlockManagerWorker网络接口processBlockMessage(blockMessage: BlockMessage)会根据message的类型做不同的处理;
另外,block的同步会有数据的传输,BlockManger会开启一个ConnectionManager后台服务,该服务采用java nio处理节点之间socket通信;BlockManagerWorker会将消息的处理接口注册为ConnectionManager的回调函数,从而处理相关的消息;
1. SparkContext类
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
从spark源码中的comments中可以看到,sparkContext类是spark运行的主要集成类,它负责与spark集群的connection,并且负责数据的生成和计算以及其中的task的调度;在Hadoop系统中,集群的调度有两大类:Job调度和Task调度。Spark也类似,如上节所述,Job的调度有master节点的schedule()负责,Task的调度则有sparkcontext实例初始化时开启的ClusterScheduler负责(实际上,可以视为clusterSchedule为一个插件箱,里面可以配置各种各样的backend服务);下面分别介绍一个sparkcontext初始化中的服务;
2. SparkContext初始化
下述代码在core/src/main/scala/spark/sparkcontext.scala文件;初始化实例的时候,必须要指定master节点的URL,
def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map())
// Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.createFromSystemProperties( System.getProperty("spark.master.host"), System.getProperty("spark.master.port").toInt, true, isLocal) SparkEnv.set(env)
SparkEnv.createFromSystemProperties会依次在master节点,开启如下服务,在scala中其实就是actor:BlockManager,BroadcastManager,CacheTracker(to be removed instead of blockmanager),MapOutputTracker,ShuffleFetcher,HttpFileServer;下面对这些服务做简要的介绍。
2.1 BlockManager
在hadoop中,数据块由HDFS服务管理,数据块的存放位置、大小信息存放在Master节点,而源数据分布在集群中的slaver节点。与Hadoop类似,spark的BlockManager也负责管理集群中的block,即数据块的信息。master节点和work节点都会生成BlockManager实例,通过masterActor进行block信息的通信;首先初始化的时候,各个节点的BlockManager都会初始化一个BlockManagerMaster实例:在master节点,会创建BlockManagerMasterActor,如果是在work节点,则masterActor将初始化为BlockManagerMasterActor的actor引用;
var masterActor: ActorRef = { if (isMaster) { val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), name = MASTER_AKKA_ACTOR_NAME) logInfo("Registered BlockManagerMaster Actor") masterActor } else { val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME) logInfo("Connecting to BlockManagerMaster: " + url) actorSystem.actorFor(url) } }
每个节点的BlockManager实例化时,首先向master节点masterActor注册blockManagerID,每一个节点的BlockManager信息,会存于master节点的blockManagerInfo;其次,每个节点会初始化一个BlockManagerWorker实例,它定义了集群中master节点和worker节点之间数据传输的网络接口(network
interface),其中由接口可以看到集群中的block数据的操作主要有两种操作:put和get;BlockManagerWorker网络接口processBlockMessage(blockMessage: BlockMessage)会根据message的类型做不同的处理;
private def initialize() { logInfo("Initializing BlockManager.....") master.registerBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { heartBeat() } } }
def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = { blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) return None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) logDebug("Received [" + gB + "]") val buffer = getBlock(gB.id) if (buffer == null) { return None } return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } case _ => return None } }
另外,block的同步会有数据的传输,BlockManger会开启一个ConnectionManager后台服务,该服务采用java nio处理节点之间socket通信;BlockManagerWorker会将消息的处理接口注册为ConnectionManager的回调函数,从而处理相关的消息;
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)简言之,BlockManager负责集群中的数据同步和提取。每个节点都会创建一个BlockManager实例,worker节点和master节点之间通过masterActor通信;节点与节点之间的数据传输通过ConnectionManager的后台服务。
相关文章推荐
- Standalone模式下Spark 中通信机制的源码分析
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- Spark的standalone源码分析(一)
- 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- Spark的standalone源码分析(四)
- Apache Spark源码走读之7 -- Standalone部署方式分析
- Spark的standalone源码分析(五)
- 源码-spark Standalone部署模式及其容错性分析
- Spark的standalone源码分析(二)
- Apache Spark源码走读之7 -- Standalone部署方式分析
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- Spark2.2 Task原理分析及源码解析
- spark-streaming-kafka-0-10源码分析
- 结合源码分析Spark中的Accuracy(准确率), Precision(精确率), 和F1-Measure
- spark源码分析之sparkcontext原理篇