Spark源码分析之BlockManagerMaster
2017-11-10 21:05
316 查看
主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。
Boolean isDriver: 是否在Driver
// 向BlockManagerMasterEndpoint发送RemoveExecutor消息
tell(RemoveExecutor(execId))
logInfo("Removed "+
execId + " successfully in removeExecutor")
}
一 核心属性
RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端Boolean isDriver: 是否在Driver
二 重要方法
2.1 从driver通信终端删除一个executor
defremoveExecutor(execId:String) {// 向BlockManagerMasterEndpoint发送RemoveExecutor消息
tell(RemoveExecutor(execId))
logInfo("Removed "+
execId + " successfully in removeExecutor")
}
2.2 向BlockManagerMasterEndpoint发送RegisterBlockManager消息
def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { logInfo("Trying to register BlockManager") // 向BlockManagerMasterEndpoint发送RegisterBlockManager消息 tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo("Registered BlockManager") }
2.3 更新数据块信息
def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long, externalBlockStoreSize: Long): Boolean = { // 向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果 val res = driverEndpoint.askWithRetry[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)) logDebug(s"Updated info of block $blockId") res }
2.4 获取某个block的所有BlockManagerId信息
def getLocations(blockId: BlockId): Seq[BlockManagerId] = { // 向BlockManagerMasterEndpoint发送GetLocations消息 driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) }
2.5 获取多个block的位置信息
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { // 向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息 driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]]( GetLocationsMultipleBlockIds(blockIds)) }
2.6 检查是否存在该数据块
def contains(blockId: BlockId): Boolean = { !getLocations(blockId).isEmpty }
2.7 获取Executor的非当前BlockManagerId
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
2.8 向BlockManagerMasterEndpoint发送RemoveBlock消息删除数据块
def removeBlock(blockId: BlockId) { driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId)) }
2.9 删除指定RDD的所有数据块
def removeRdd(rddId: Int, blocking: Boolean) { val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) }(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } }
2.10 删除指定shuffle的所有数据块
def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) }(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } }
2.11 返回每一个block manager的内存状态 def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) }
2.12 返回存储状态
def getStorageStatus: Array[StorageStatus] = { driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus) }
2.13 获取block状态
def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap }
2.14 找到那些executor有缓存的block
def hasCachedBlocks(executorId: String): Boolean = { driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) }
相关文章推荐
- Spark源码调试分析(一)-------------调试环境准备
- Spark源码分析之-scheduler模块
- Spark2.2 Executor原理剖析及源码分析
- Spark源码分析之-scheduler模块
- Spark中job、stage、task的划分+源码执行过程分析
- Spark源码分析(七)存储管理2
- 从Spark-Shell到SparkContext的函数调用路径过程分析(源码)
- Spark源码分析之Worker
- spark 1.6.0 core源码分析2 master启动流程
- spark-storage模块源码分析
- spark 源码分析
- Spark 源码分析 -- RDD
- spark 1.6.0 core源码分析5 spark提交框架
- spark源码学习(十五)--- application注册机制分析
- Spark源码分析-master启动
- Spark2.2源码之Task任务提交源码分析
- 结合源码分析Spark中的Accuracy(准确率), Precision(精确率), 和F1-Measure
- Spark源码分析之八:Task运行(二)
- Spark SQL组件源码分析
- Eclipse+Spark搭建源码分析环境问题分析