Spark的standalone源码分析(四)
2013-02-07 01:20
579 查看
承接上文,继续分析sparkcontext初始化中开启的broadcast服务,文中部分内容参考论文“Performance and Scalability of Broadcast in Spark ”;
2.2 BroadcastManager
相比hadoop,spark的优势在于迭代计算,尤其是一些机器学习算法的实现;在这类计算中,经常需要同步large read-only数据,比如字典表、通用的配置文件等。如果将这些数据和运算闭包绑定在一次,那每一次迭代都需要考虑这些数据,没有必要;类似于hadoop的distributeCache,spark通过broadcast服务,将数据同步到每一个worker节点,同时这些数据且只同步一次;另外,论文详细分析了spark默认的broadcast策略(基于HDFS)的性能,并与Chained StreamingBroadcast,BitTorrent Broadcast和SplitStreaming Broadcast做了比较,有兴趣的朋友可以去看一下。本节会逐一介绍spark源码中httpbroadcast、treebroadcast、bitTorrent broadcast的实现;
首先说一下broadcast的初始化和调用方法;
1) broadcastManager的初始化及调用
broadcastmanager初始化的时候,带有isMaster参数,即master节点和work节点的初始化操作是有区别的;class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { private var initialized = false private var broadcastFactory: BroadcastFactory = null
broadcast的调用,是通过sparkcontext实例调用broadcast方法实现,比如HadoopRDD中通过broadcast同步JobConf对象,sc即sparkcontext的一个实例。
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
而sc.broadcast会初始化一个Broadcast的实体类:
/** * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for * reading it in distributed functions. The variable will be sent to each cluster only once. */ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal)
2)httpbroadcast
spark.broadcast.httpbroadcast是通过在master节点建立http-server(包的是jetty server)的方式来广播对象的。首先,主节点初始化的时候,开启服务,而worker节点从环境变量中获取主节点的serveruri;def initialize(isMaster: Boolean) { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt compress = System.getProperty("spark.broadcast.compress", "true").toBoolean if (isMaster) { createServer() } serverUri = System.getProperty("spark.httpBroadcast.uri") initialized = true } } }
sc.broadcast方法广播变量的时候(会在主节点执行),会首先生成一个broadcast_id,然后将<id,value>交由blockMangager管理(注:blockManager并不会把这个blockid的状态report给master节点,这里面blockManager只是起到了cache的作用,确保broadcast广播的变量只同步一次);由于value声明为transient,所以在broadcast对象序列化的时候会忽略value变量,这样避免大的数据对象在集群中的拷贝;httpbroadcast在write的时候,会将value写入主节点的临时目录下的以broadcast-id命名的文件内。
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { def value = value_ def blockId: String = "broadcast_" + id HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) } if (!isLocal) { HttpBroadcast.write(id, value_) }
def write(id: Long, value: Any) { val file = new File(broadcastDir, "broadcast-" + id) val out: OutputStream = if (compress) { new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering } else { new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) } val ser = SparkEnv.get.serializer.newInstance() val serOut = ser.serializeStream(out) serOut.writeObject(value) serOut.close() }worker节点在反序列化broadcast对象实例的时候,会开始从master节点同步broadcast_id对应的内容;具体实现是,每一个broadcast的派生类都overwrite了一个readObject的方法,该方法会定义从master节点同步数据的策略。在httpbroadcast中,worker节点反序列化broadcast对象获得blockid,首先请求blockmanager,检测blockmanager中是否已经存有该blockid,如果有的的话,则通过blockManager将value数据同步到worker节点,反之,则通过socket将blockid文件的内容同步到work节点,然后将<id,value>存入blockmanager。
// Called by JVM when deserializing an object private def readObject(in: ObjectInputStream) { in.defaultReadObject() HttpBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(blockId) match { case Some(x) => value_ = x.asInstanceOf[T] case None => { logInfo("Started reading broadcast variable " + id) val start = System.nanoTime value_ = HttpBroadcast.read[T](id) SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) val time = (System.nanoTime - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") } } } }可见,http-broadcast是采用中心节点同步的方式,随着集群规模的扩大,这样的同步方式显然效率不高;
3) TreeBroadcast
broadcast factory初始化treebroadcast时,在master节点,会初始化TrackMultipleValues服务;TrackMultipleValues服务监听MasterTrackerPort端口,处理接受到得三类数据:1)register-broadcast:将发送节点的hostaddress和监听端口信息加入valueToGuideMap;2)unregister-broadcast:将发送节点信息在valueToGuideMap里面置空;3)find-broadcast:根据发送节点的请求的id,返回对应的SourceInfo;def initialize(isMaster__ : Boolean) { synchronized { ........ if (isMaster) { trackMV = new TrackMultipleValues trackMV.setDaemon(true) trackMV.start() // Set masterHostAddress to the master's IP address for the slaves to read System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress) } initialized = true } } }
if (messageType == REGISTER_BROADCAST_TRACKER) { // Receive Long val id = ois.readObject.asInstanceOf[Long] // Receive hostAddress and listenPort val gInfo = ois.readObject.asInstanceOf[SourceInfo] / Add to the map valueToGuideMap.synchronized { valueToGuideMap += (id -> gInfo) } // Send dummy ACK oos.writeObject(-1) oos.flush() } else if (messageType == UNREGISTER_BROADCAST_TRACKER) { // Receive Long val id = ois.readObject.asInstanceOf[Long] // Remove from the map valueToGuideMap.synchronized { valueToGuideMap(id) = SourceInfo("", SourceInfo.TxOverGoToDefault) } // Send dummy ACK oos.writeObject(-1) oos.flush() } else if (messageType == FIND_BROADCAST_TRACKER) { // Receive Long val id = ois.readObject.asInstanceOf[Long] var gInfo = if (valueToGuideMap.contains(id)) valueToGuideMap(id) else SourceInfo("", SourceInfo.TxNotStartedRetry) // Send reply back oos.writeObject(gInfo) oos.flush() }
在初始化完成之后,与httpbroadcast类似,treebroadcast的sparkcontext的实例会通过broadcast方法,开始广播变量,期间会在master节点开启如下两个服务:
1. GuideMultipleRequests服务,保存集群中每个work节点的存有的broadcast变量信息,包括变量block个数,大小等;work节点在receivebroadcast时,会首先请求该服务,根据broadcast-id,获得相应的SourceInfo(master节点有相应的策略选择拥有该SourceInfo节点),其中包括该SourceInfo所在的hostaddress以及listenport;master节点会将该work节点的SourceInfo加入listOfSources;
2. ServeMultipleRequests服务,会在集群中的每一个节点启动,用以broadcast变量的block块的同步;work节点在获得SourceInfo之后,会向对应的节点下载block数据;由于节点既分发数据又下载数据,需要对正在同步的block加锁来确保数据的一致性;
private def sendObject() { // Wait till receiving the SourceInfo from Master while (totalBlocks == -1) { totalBlocksLock.synchronized { totalBlocksLock.wait() } } for (i <- sendFrom until sendUntil) { while (i == hasBlocks) { hasBlocksLock.synchronized { hasBlocksLock.wait() } } } .......... }
receiveSingleTransmission
for (i <- hasBlocks until totalBlocks) { val recvStartTime = System.currentTimeMillis val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] val receptionTime = (System.currentTimeMillis - recvStartTime) logDebug("Received block: " + bcBlock.blockID + " from " + sourceInfo + " in " + receptionTime + " millis.") arrayOfBlocks(hasBlocks) = bcBlock hasBlocks += 1 // Set to true if at least one block is received receptionSucceeded = true hasBlocksLock.synchronized { hasBlocksLock.notifyAll() } }
最后简单介绍一下,master节点selectSuitableSource的策略:每一个work节点connect主节点的时候,都会被加入listOfSources,master节点按照FIFO的优先策略来选择source供work节点下载broadcast变量;首先只有master节点具有全部的数据,master节点选为源节点,每一个work节点从源节点同步数据,每连接上一个work节点,当前的源节点的leechers数加一,当源节点的leechers大于MaxDegree的时候,则停止该源节点,开始选择后来加入的work节点作为源节点,以此类推;当某个废弃的源节点完全同步成功所有的block后,该节点的leechers数减一,同时append入listOfSources,显然该节点的优先级别更高;
private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = { var maxLeechers = -1 var selectedSource: SourceInfo = null listOfSources.foreach { source => if ((source.hostAddress != skipSourceInfo.hostAddress || source.listenPort != skipSourceInfo.listenPort) && source.currentLeechers < MultiTracker.MaxDegree && source.currentLeechers > maxLeechers) { selectedSource = source maxLeechers = source.currentLeechers } } // Update leecher count selectedSource.currentLeechers += 1 return selectedSource }
相关文章推荐
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- Standalone模式下Spark 中通信机制的源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Spark的standalone源码分析(二)
- Apache Spark源码走读之7 -- Standalone部署方式分析
- Spark的standalone源码分析(五)
- 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
- Apache Spark源码走读之7 -- Standalone部署方式分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Spark的standalone源码分析(三)
- 源码-spark Standalone部署模式及其容错性分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- Spark的standalone源码分析(一)
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
- Spark源码分析 – Executor
- Spark源码调试分析(一)-------------调试环境准备
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]