大数据:Spark Storage(一) 集群下的区块管理
2017-03-21 09:26
337 查看
Storage模块
在Spark中提及最多的是RDD,而RDD所交互的数据是通过Storage来实现和管理Storage模块整体架构
1. 存储层
在Spark里,单节点的Storage的管理是通过block来管理的,每个Block的存储可以在内存里或者在磁盘中,在BlockManager里既可以管理内存的存储,同时也管理硬盘的存储,存储的标识是通过块的ID来区分的。2. 集群下的架构
2.1 架构
在集群下Spark的Block的管理架构使用Master-Slave模式Master : 拥有所有block的具体信息(本地和Slave节点)
Slave : 通过master获取block的信息,并且汇报自己的信息
这里的Master并不是Spark集群中分配任务的Master,而是提交task的客户端Driver,这里并没有主备设计,因为Driver client是单点的,通常Driver client crash了,计算也没有结果了,在Storage 的集群管理中Master是由driver承担。
在Executor在运行task的时候,通过blockManager获取本地的block块,如果本地找不到,尝试通过master去获取远端的块
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { val pieceId = BroadcastBlockId(id, "piece" + pid) logDebug(s"Reading piece $pieceId of $broadcastId") // First try getLocalBytes because there is a chance that previous attempts to fetch the // broadcast blocks have already fetched some of the blocks. In that case, some blocks // would be available locally (on this executor). bm.getLocalBytes(pieceId) match { case Some(block) => blocks(pid) = block releaseLock(pieceId) case None => bm.getRemoteBytes(pieceId) match { case Some(b) => if (checksumEnabled) { val sum = calcChecksum(b.chunks(0)) if (sum != checksums(pid)) { throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + s" $sum != ${checksums(pid)}") } } // We found the block from remote executors/driver's BlockManager, so put the block // in this executor's BlockManager. if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException( s"Failed to store $pieceId of $broadcastId in local BlockManager") } blocks(pid) = b case None => throw new SparkException(s"Failed to get $pieceId of $broadcastId") } } }
2.2 Executor获取块内容的位置
唯一的blockID:
broadcast_0_piece0
请求Master获取该BlockID所在的 Location,也就是BlockManagerId的集合
/** Get locations of the blockId from the driver */ def getLocations(blockId: BlockId): Seq[BlockManagerId] = { driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) }
唯一的BlockManagerId
BlockManagerId(driver, 192.168.121.101, 55153, None)
Executor ID, executor ID, 对driver来说就是driver
Host: executor/driver IP
Port: executor/driver Port
每一个executor, 和driver 都生成唯一的BlockManagerId
2.3 Executor获取块的内容
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 var totalFailureCount = 0 val locations = getLocations(blockId) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { val loc = locationIterator.next() logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => runningFailureCount += 1 totalFailureCount += 1 if (totalFailureCount >= maxFetchFailures) { // Give up trying anymore locations. Either we've tried all of the original locations, // or we've refreshed the list of locations from the master, and have still // hit failures after trying locations from the refreshed list. logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " + s"Most recent failure cause:", e) return None } logWarning(s"Failed to fetch remote block $blockId " + s"from $loc (failed attempt $runningFailureCount)", e) // If there is a large number of executors then locations list can contain a // large number of stale entries causing a large number of retries that may // take a significant amount of time. To get rid of these stale entries // we refresh the block locations after a certain number of fetch failures if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { locationIterator = getLocations(blockId).iterator logDebug(s"Refreshed locations from the driver " + s"after ${runningFailureCount} fetch failures.") runningFailureCount = 0 } // This location failed, so we retry fetch from a different one by returning null here null } if (data != null) { return Some(new ChunkedByteBuffer(data)) } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
通过获取的BlockManagerId的集合列表,顺序的从列表中取出一个拥有该Block的服务器,通过
blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()同步的获取块的内容,如果该块不存在,则换下一个拥有该Block的服务器
2.4 BlockManager注册
Driver 初始化SparkContext.init 的时候,会初始化BlockManager.initializeval idFromMaster = master.registerBlockManager( id, maxMemory, slaveEndpoint)会通过master 注册BlockManager
def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId }
在BlockManagerMaster里,我们看到了endpoint是强制的driver,也就是默认是driver 是master
无论driver,还是executor都是初始化后BlockManager,发消息给driver master进行注册,唯一不同的是driver标识自己的ID是driver,而executor是按照executor id来标识自己的
2.5 Driver Master的endpoint
前面一节已经介绍过无论driver还是executor 都会发送消息到Driver的Master,在Driver 和Executor里SparkEnv.create的时候会初始化BlockManagerMasterval blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)注册一个lookup的endpoint
def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } }
代码中可以看到只有isDriver的时候才会setup一个rpc的endpoint,默认是netty的rpc环境,命名为:BlockManagerMaster
spark://BlockManagerMaster@192.168.121.101:40978所有的driver, executor都会向master 40978发消息
2.6 Master和Executor消息格式
下面的代码每个case都是master和executor的消息格式override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => context.reply(register(blockManagerId, maxMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) case GetLocations(blockId) => context.reply(getLocations(blockId)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) case GetPeers(blockManagerId) => context.reply(getPeers(blockManagerId)) case GetExecutorEndpointRef(executorId) => context.reply(getExecutorEndpointRef(executorId)) case GetMemoryStatus => context.reply(memoryStatus) case GetStorageStatus => context.reply(storageStatus) case GetBlockStatus(blockId, askSlaves) => context.reply(blockStatus(blockId, askSlaves)) case GetMatchingBlockIds(filter, askSlaves) => context.reply(getMatchingBlockIds(filter, askSlaves)) case RemoveRdd(rddId) => context.reply(removeRdd(rddId)) case RemoveShuffle(shuffleId) => context.reply(removeShuffle(shuffleId)) case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver)) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true) case RemoveExecutor(execId) => removeExecutor(execId) context.reply(true) case StopBlockManagerMaster => context.reply(true) stop() case BlockManagerHeartbeat(blockManagerId) => context.reply(heartbeatReceived(blockManagerId)) case HasCachedBlocks(executorId) => blockManagerIdByExecutor.get(executorId) match { case Some(bm) => if (blockManagerInfo.contains(bm)) { val bmInfo = blockManagerInfo(bm) context.reply(bmInfo.cachedBlocks.nonEmpty) } else { context.reply(false) } case None => context.reply(false) } }
2.7 Master结构关系
在Master上会保存每一个executor所对应的BlockManagerID和BlockManagerInfo,而在BlockManagerInfo中保存了每个block的状态
Executor通过心跳主动汇报自己的状态,Master更新EndPoint中Executor的状态
Executor 中的block的状态更新也会汇报给Master,只是跟新Master状态,但不会通知其他的Executor
在Executor和Master交互中是Executor主动推和获取数据的,Master只是管理executor的状态,以及Block的所在的Driver、Executor的位置及其状态,负载较小,Master没有考虑可用性,通常Master节点就是提交任务的Driver的节点。
相关文章推荐
- 大数据:Spark Storage(二) 集群下的broadcast
- RH436-1数据管理、存储及集群技术概述
- solr管理配置,关于数据导入,集群复制及日志配置
- Spark修炼之道(基础篇)——Linux大数据开发基础:第八节:网络管理
- 跟上大数据的步伐:快速搭建Spark集群
- DT大数据梦工厂- 第6课 精通Spark集群搭建与测试
- MYSQL集群管理节点和数据节点的检测脚本
- 大数据集群工作流及任务管理组件对比
- DayDayUP_大数据学习课程[2]_spark1.4.1集群环境的搭建
- 大数据之Spark集群部署
- GIS+=地理信息+行业+大数据——Spark集群下SPARK SQL开发测试介绍
- Ironfan在大数据集群部署、配置管理中的应用 推荐
- Hadoop的集群数据、mapreduce管理及安全机制介绍
- solr管理配置,关于数据导入,集群复制及日志配置
- Spark修炼之道(基础篇)——Linux大数据开发基础:第七节:进程管理
- HDFS1.0源代码解析—DataNode端数据存储和管理DataStorage和FSDataset解析
- spark集群管理简述
- 第七章:在Spark集群上使用文件中的数据加载成为graph并进行操作(3)
- HCNA-Storage(一)信息数据管理
- 用maven管理spark应用程序,提交到spark on yarn 集群上运行