Spark源码分析之BlockManager
2017-11-10 21:05
531 查看
BlockManager是对外提供的统一访问block的接口,在Master和Slave上都有一个实例,他提供读写数据的方法,并且根据不同StorageLevel调用不同的BlockStore来读写数据。
在应用程序启动的时候,SparkContext会创建Driver端的SpakEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在其内部创建消息通信的终端BlockManagerMasterEndpoint.
在Executor启动的时候,也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和BlockTransferService. 在BlockManager初始化的过程一方面会加入BlockManagerSlaveEndpoint的消息终端,并把该BlockManagerSlaveEndpoint的该终端引用注册到Driver中,这样Driver和Executor就可以相互持有通信终端的引用
//
创建BlockManagerMaster
val blockManagerMaster=
new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
// 创建BlockManagerMasterEndpoint
new BlockManagerMasterEndpoint(rpcEnv,isLocal,
conf,listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
// 创建BlockManager
val blockManager=
new BlockManager(executorId,rpcEnv,
blockManagerMaster,
serializer, conf,mapOutputTracker,
shuffleManager, blockTransferService,
securityManager,
numUsableCores)
RpcEnv rpcEnv: rpc通信环境
BlockManagerMaster master: 主要负责整个应用程序在运行期间block元数据的管理和维护,和指令的发送
Serializer defaultSerializer: 默认的序列化机制
Long maxMemory:分配的最大可用内存
Int numUsableCores: 可以使用cpu核数
MapOutputTracker mapOutputTracker:map端shuffle过程的输出状态
ShuffleManager shuffleManager: Shuffle管理器
BlockTransferService blockTransferService: 用于远程间传输数据,用于获取上床block
DiskBlockManager diskBlockManager:主要用于管理磁盘上block以及对应文件和目录
TimeStampedHashMap[BlockId, BlockInfo] blockInfo: 构建一个维护<blockId,blockInfo>的映射
Boolean externalBlockStoreInitialized: 是否使用外部存储
MemoryStore memoryStore:内存存储对象
DiskStore diskStore:磁盘存储对象
ExternalBlockStore externalBlockStore:外部寸处对象
BlockManagerId blockManagerId:当前BlockManager对应的id
ShuffleClient shuffleClient:读取其他executor上的shuffle 文件的客户端,可能是外部服务也有可能是标准的数据块传输服务,如果启用了如果启用外部shuffle 服务,则创建ExternalShuffleClient否则创建 BlockTransferService
Boolean compressBroadcast:是否对广播数据进行压缩
Boolean compressShuffle:是否压缩map输出文件,一般建议打开,但是如果cpu资源消耗太大,则不建议设置为true
Boolean compressRdds:是否要压缩序列化的RDD分区
Boolean compressShuffleSpill:是否对map端溢写的临时文件进行压缩
BlockManagerSlaveEndpoint slaveEndpoint: 持有的BlockManagerSlaveEndpoint通信终端
# 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端
# 构建blockManagerId
# 构建shuffleServerId
# 向BlockManagerMaster注册BlockManager
# 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务
# 结果封装成buffer,然后创建NioManagedBuffer,然后返回
# doGetLocal:根据指定的blockId获取本地block数据,如果是存在内存上的直接从内存获取;如果是存储在磁盘上的则从磁盘获取,如果是MEMORY_AND_DISK,先放入内存,再返回数据,下次就可以从内存获取,否则直接返回等
在应用程序启动的时候,SparkContext会创建Driver端的SpakEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在其内部创建消息通信的终端BlockManagerMasterEndpoint.
在Executor启动的时候,也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和BlockTransferService. 在BlockManager初始化的过程一方面会加入BlockManagerSlaveEndpoint的消息终端,并把该BlockManagerSlaveEndpoint的该终端引用注册到Driver中,这样Driver和Executor就可以相互持有通信终端的引用
//
创建BlockManagerMaster
val blockManagerMaster=
new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
// 创建BlockManagerMasterEndpoint
new BlockManagerMasterEndpoint(rpcEnv,isLocal,
conf,listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
// 创建BlockManager
val blockManager=
new BlockManager(executorId,rpcEnv,
blockManagerMaster,
serializer, conf,mapOutputTracker,
shuffleManager, blockTransferService,
securityManager,
numUsableCores)
一 核心属性
String executorId: executorId或者是driverIdRpcEnv rpcEnv: rpc通信环境
BlockManagerMaster master: 主要负责整个应用程序在运行期间block元数据的管理和维护,和指令的发送
Serializer defaultSerializer: 默认的序列化机制
Long maxMemory:分配的最大可用内存
Int numUsableCores: 可以使用cpu核数
MapOutputTracker mapOutputTracker:map端shuffle过程的输出状态
ShuffleManager shuffleManager: Shuffle管理器
BlockTransferService blockTransferService: 用于远程间传输数据,用于获取上床block
DiskBlockManager diskBlockManager:主要用于管理磁盘上block以及对应文件和目录
TimeStampedHashMap[BlockId, BlockInfo] blockInfo: 构建一个维护<blockId,blockInfo>的映射
Boolean externalBlockStoreInitialized: 是否使用外部存储
MemoryStore memoryStore:内存存储对象
DiskStore diskStore:磁盘存储对象
ExternalBlockStore externalBlockStore:外部寸处对象
BlockManagerId blockManagerId:当前BlockManager对应的id
ShuffleClient shuffleClient:读取其他executor上的shuffle 文件的客户端,可能是外部服务也有可能是标准的数据块传输服务,如果启用了如果启用外部shuffle 服务,则创建ExternalShuffleClient否则创建 BlockTransferService
Boolean compressBroadcast:是否对广播数据进行压缩
Boolean compressShuffle:是否压缩map输出文件,一般建议打开,但是如果cpu资源消耗太大,则不建议设置为true
Boolean compressRdds:是否要压缩序列化的RDD分区
Boolean compressShuffleSpill:是否对map端溢写的临时文件进行压缩
BlockManagerSlaveEndpoint slaveEndpoint: 持有的BlockManagerSlaveEndpoint通信终端
二 重要方法
2.1initialize 初始化方法,用指定的application id 实例化BlockManager
# 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据# 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端
# 构建blockManagerId
# 构建shuffleServerId
# 向BlockManagerMaster注册BlockManager
# 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务
def initialize(appId: String): Unit = { // 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据 blockTransferService.init(this) // 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端 shuffleClient.init(appId) // 构建blockManagerId blockManagerId = BlockManagerId( executorId, blockTransferService.hostName, blockTransferService.port) // 构建shuffleServerId shuffleServerId = if (externalShuffleServiceEnabled) { BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } // 向BlockManagerMaster注册BlockManager,传入了slaveEndpoint,用于和BlockManagerMaster通信 master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) // 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务 if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } }
2.2reportAllBlocks 报告BlockManager所有的数据块
private def reportAllBlocks(): Unit = { logInfo(s"Reporting ${blockInfo.size} blocks to the master.") for ((blockId, info) <- blockInfo) { // 获取每一个block当前的状态 val status = getCurrentBlockStatus(blockId, info) if (!tryToReportBlockStatus(blockId, info, status)) { logError(s"Failed to report $blockId to master; giving up.") return } } }
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { info.synchronized { info.level match { case null => BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1 val storageLevel = StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val externalBlockStoreSize = if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize) } } }
2.3reregister 注册BlockManager,并且报告BlockManager所有数据块的状态
def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo("BlockManager re-registering with master") master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) reportAllBlocks() }
2.4getBlockData 获取本地数据块数据
# 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块,否则调用doGetLocal从本地获取# 结果封装成buffer,然后创建NioManagedBuffer,然后返回
override def getBlockData(blockId: BlockId): ManagedBuffer = { // 首先判断是不是shuffle的数据块 if (blockId.isShuffle) { // 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块 shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { // 获取本地block数据 val blockBytesOpt = doGetLocal(blockId, asBlockResult = false) .asInstanceOf[Option[ByteBuffer]] if (blockBytesOpt.isDefined) { // 获取buffer,然后创建NioManagedBuffer val buffer = blockBytesOpt.get new NioManagedBuffer(buffer) } else { throw new BlockNotFoundException(blockId.toString) } } }
# doGetLocal:根据指定的blockId获取本地block数据,如果是存在内存上的直接从内存获取;如果是存储在磁盘上的则从磁盘获取,如果是MEMORY_AND_DISK,先放入内存,再返回数据,下次就可以从内存获取,否则直接返回等
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { // 根据blockId获取blockInfo val info = blockInfo.get(blockId).orNull // 如果blockInfo不为空 if (info != null) { info.synchronized { // 再次检查blockInfo是否为空 if (blockInfo.get(blockId).isEmpty) { logWarning(s"Block $blockId had been removed") return None } // 检查该block是否其他线程正在写,等待该block变为ready状态 if (!info.waitForReady()) { // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure.") return None } // 获取该Block的存储级别 val level = info.level logDebug(s"Level for block $blockId is $level") // 如果存储级别是内存,则查找MemoryStore返回数据 if (level.useMemory) { logDebug(s"Getting block $blockId from memory") val result = if (asBlockResult) { memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => return result case None => logDebug(s"Block $blockId not found in memory") } } // 如果使用堆外存储,则查找ExternalBlockStore返回数据 if (level.useOffHeap) { logDebug(s"Getting block $blockId from ExternalBlockStore") if (externalBlockStore.contains(blockId)) { val result = if (asBlockResult) { externalBlockStore.getValues(blockId) .map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { externalBlockStore.getBytes(blockId) } result match { case Some(values) => return result case None => logDebug(s"Block $blockId not found in ExternalBlockStore") } } } // 如果使用磁盘存储,则查找DiskStore返回数据 if (level.useDisk) { logDebug(s"Getting block $blockId from disk") val bytes: ByteBuffer = diskStore.getBytes(blockId) match { case Some(b) => b case None => throw new BlockException( blockId, s"Block $blockId not found on disk, though it should be") } assert(0 == bytes.position()) // 如果不能使用内存存储,则直接返回结果 if (!level.useMemory) { // If the block shouldn't be stored in memory, we can just return it if (asBlockResult) { return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size)) } else { return Some(bytes) } } else { // 若可以存入内存,将查询出来的数据放入内存,这样下次再查找该block数据直接从内存获取,以提高速度 if (!level.deserialized || !asBlockResult) { memoryStore.putBytes(blockId, bytes.limit, () => { val copyForMemory = ByteBuffer.allocate(bytes.limit) // 如果内存放不下该block数据,将会发生OOM,因此放入ByteBuffer并且懒加载 copyForMemory.put(bytes) }) bytes.rewind() } if (!asBlockResult) { return Some(bytes) } else {// 如果想返回BlockResult对象 // 将字节数据数据反序列化 val values = dataDeserialize(blockId, bytes) // 如果存储级别是反序列化 if (level.deserialized) { // 再返回之前放入内存缓存 val putResult = memoryStore.putIterator( blockId, values, level, returnValues = true, allowPersistToDisk = false) putResult.data match { case Left(it) => return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) case _ => // This only happens if we dropped the values back to disk (which is never) throw new SparkException("Memory store did not return an iterator!") } } else { return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } } } } } } else { logDebug(s"Block $blockId not registered locally") } None }
2.5getLocal 从本地BlockManager获取数据
def getLocal(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] }
2.6getLocalgetLocalBytes从本地BlockManager获取数据并且序列化结果
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting local block $blockId as bytes") if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. Option( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) } else { doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } }
2.7doGetRemote 从远端获取数据
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") // 从blockId对应的BlockManagerId,然后打乱顺序 val locations = Random.shuffle(master.getLocations(blockId)) // 遍历每一个BlockManagerId,调用blockTransferService的fetchBlockSync去拉取数据,返回 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() if (data != null) { if (asBlockResult) { return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())) } else { return Some(data) } } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
2.8doPut根据StorageLevel存放数据
private def doPut(blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None) : Seq[(BlockId, BlockStatus)] = { // 检查BlockId,StorageLevel是否为空 require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") effectiveStorageLevel.foreach { level => require(level != null && level.isValid, "Effective StorageLevel is null or invalid") } // 新建数组,存放blockId和block状态 val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] // 获取BlockInfo val putBlockInfo = { // 创建BlockInfo对象 val tinfo = new BlockInfo(level, tellMaster) // 将该blockId和创建BlockInfo对象放入内存,并且返回更新key之对应的value之前的value,即 // 如果存在该key对应的value,则返回,如果没有则直接存入,返回为空,即不能进行更新 val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // 如果已经有旧的BlockInfo,则判断是否正准备写,如果准备些则直接返回更新的(BlockId, BlockStatus)数组 if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") return updatedBlocks } oldBlockOpt.get } else { tinfo // 否则直接返回blockInfo } } val startTimeMs = System.currentTimeMillis var valuesAfterPut: Iterator[Any] = null var bytesAfterPut: ByteBuffer = null // 数据块大小 var size = 0L // 存储的级别 val putLevel = effectiveStorageLevel.getOrElse(level) // 启动一个线程在本地存储之前,异步初始化好要进行的备份的数据,这有助于提高发送数据的速度 val replicationFuture = data match { case b: ByteBufferValues if putLevel.replication > 1 => // 复制出一个新的buffer val bufferView = b.buffer.duplicate() Future { // 拷贝数据到其他节点 replicate(blockId, bufferView, putLevel) }(futureExecutionContext) case _ => null } // 防止其他线程put这个block,所以需要使用同步操作指导marked置为true putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) var marked = false try { // returnValues:是否返回put操作的值 // blockStore:存储方式 val (returnValues, blockStore: BlockStore) = { // 内存存储返回true和MemoryStore if (putLevel.useMemory) { (true, memoryStore) } else if (putLevel.useOffHeap) { //对外存储返回false和ExternalBlockStore (false, externalBlockStore) } else if (putLevel.useDisk) { // 磁盘存储,如果复制因子大于1返回true和DiskStore,否则返回false和DiskStore (putLevel.replication > 1, diskStore) } else { assert(putLevel == StorageLevel.NONE) throw new BlockException( blockId, s"Attempted to put block $blockId without specifying storage level!") } } // 匹配put操作存入的值类型,调用blockStore方法 val result = data match { // 可迭代值类型,调用BlockStore的putIterator方法 case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues) // 数组类型,调用BlockStore的putArray方法 case ArrayValues(array) => blockStore.putArray(blockId, array, putLevel, returnValues) // ByteBufferValues类型,调用BlockStore的putBytes方法 case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) } // 获取结果大小 size = result.size result.data match { case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator case Right (newBytes) => bytesAfterPut = newBytes case _ => } // 如果使用内存存储,遍历result结果中的droppedBlocks,将溢出到磁盘的block添加到updateBlock集合中 if (putLevel.useMemory) { result.droppedBlocks.foreach { updatedBlocks += _ } } // 获取当前block状态 val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // 将marked标记置为true marked = true // 调用blockInfo的markReady标记该block写操作完成 putBlockInfo.markReady(size) // 向Master汇报block状态 if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } // 更新updatedBlocks updatedBlocks += ((blockId, putBlockStatus)) } } finally { // If we failed in putting the block to memory/disk, notify other possible readers // that it has failed, and then remove it from the block info map. if (!marked) { // Note that the remove must happen before markFailure otherwise another thread // could've inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) putBlockInfo.markFailure() logWarning(s"Putting block $blockId failed") } } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) // 如果复制因子大于1,开始异步复制数据 if (putLevel.replication > 1) { data match { case ByteBufferValues(bytes) => if (replicationFuture != null) { Await.ready(replicationFuture, Duration.Inf) } case _ => val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") } bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, putLevel) logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } // 如果是基于内存映射的,则开始清理ByteBuffer BlockManager.dispose(bytesAfterPut) if (putLevel.replication > 1) { logDebug("Putting block %s with replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } else { logDebug("Putting block %s without replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } updatedBlocks }
2.9getPeers 获取集群中非当前BlockManagerId和非Driver端的BlockManagerId的所有BlockManagerId
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { peerFetchLock.synchronized { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl if (cachedPeers == null || forceFetch || timeout) { cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) lastPeerFetchTime = System.currentTimeMillis logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) } cachedPeers } }
2.10replicate 复制数据块到其他节点
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { // 所允许最大复制失败次数 val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) // 需要复制到其他的BlockManagerId数量 val numPeersToReplicateTo = level.replication - 1 // 需要被复制的BlockManagerId数组 val peersForReplication = new ArrayBuffer[BlockManagerId] // 需要复制到其他的BlockManagerId数组 val peersReplicatedTo = new ArrayBuffer[BlockManagerId] // 复制失败的BlockManagerId数组 val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] // 创建存储级别 val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) var replicationFailed = false var failures = 0 var done = false // 获取Executor非当前BlockManagerId集合 peersForReplication ++= getPeers(forceFetch = false) // 获取一个随机的BlockManagerId def getRandomPeer(): Option[BlockManagerId] = { // 如果复制失败 if (replicationFailed) { // 清理数组 peersForReplication.clear() // 把获取Executor非当前BlockManagerId集合放入该集合 peersForReplication ++= getPeers(forceFetch = true) // 移除掉需要复制到那个BlockManagerId peersForReplication --= peersReplicatedTo // 移除掉失败复制到那个BlockManagerId peersForReplication --= peersFailedToReplicateTo } // 如果需要被复制的BlockManagerId集合部位空,则则随机取出一个BlockManagerId if (!peersForReplication.isEmpty) { Some(peersForReplication(random.nextInt(peersForReplication.size))) } else { None } } // 如果复制还没完成,则不断地循环 while (!done) { // 获取随机的一个BlockManagerId getRandomPeer() match { case Some(peer) => try { val onePeerStartTime = System.currentTimeMillis data.rewind() logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") // 调用BlockTransferService的uploadBlockSync方法,同步上床block blockTransferService.uploadBlockSync( peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) // 更新复制到的BlockManagerId的集合 peersReplicatedTo += peer // 被复制到的BlockManagerId的集合移除掉这个BlockManagerId,避免下一次复制到一个BlockManager peersForReplication -= peer replicationFailed = false // 如果已经复制的数量等于需要需要复制的数量,则表示复制完成 if (peersReplicatedTo.size == numPeersToReplicateTo) { done = true } } catch { case e: Exception => logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) failures += 1 replicationFailed = true peersFailedToReplicateTo += peer if (failures > maxReplicationFailures) { // too many failures in replcating to peers done = true } } case None => // no peer left to replicate to done = true } } val timeTakeMs = (System.currentTimeMillis - startTime) logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } }
2.11dropFromMemory 从内存中放弃某一个block,可能内存已经满了,放在磁盘比较合适等
def dropFromMemory( blockId: BlockId, data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") // 获取BlockInfo val info = blockInfo.get(blockId).orNull // If the block has not already been dropped if (info != null) { info.synchronized { if (!info.waitForReady()) { logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None } else if (blockInfo.get(blockId).isEmpty) { logWarning(s"Block $blockId was already dropped.") return None } var blockIsUpdated = false val level = info.level // 存储级别是磁盘且磁盘不包括这个block,则存入磁盘 if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.putArray(blockId, elements, level, returnValues = false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } blockIsUpdated = true } // 如果内存包含这个blockId,则获取这个blocl大小 val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // MemoryStore移除这个block val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } // 重新获取当前block状态 val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { // 向master报告数据块状态 reportBlockStatus(blockId, info, status, droppedMemorySize) } // 如果存储级别不是磁盘,则<blockId.,blockInfo>移除这个blockId if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. blockInfo.remove(blockId) } if (blockIsUpdated) { return Some(status) } } } None }
2.12removeRdd 删除所有数据当前RDD的数据块
def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size }
2.13removeBlock 从内存和磁盘中移除block
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") // 根据blockId获取blockInfo val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { // 内存移除这个block val removedFromMemory = memoryStore.remove(blockId) // 磁盘移除这个block val removedFromDisk = diskStore.remove(blockId) // ExternalBlockStore移除这个block val removedFromExternalBlockStore = if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { logWarning(s"Block $blockId could not be removed as it was not found in either " + "the disk, memory, or external block store") } // <blockId,blockInfo>开始移除这个blockId blockInfo.remove(blockId) // 如果需要,则向master报告数据块状态 if (tellMaster && info.tellMaster) { val status = getCurrentBlockStatus(blockId, info) reportBlockStatus(blockId, info, status) } } } else { // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") } }
相关文章推荐
- 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
- spark源码学习(八)--- executor启动task分析
- Spark源码分析:多种部署方式之间的区别与联系(转)
- 【转】Spark源码分析之-Storage模块
- spak源码分析之sparkContext源码分析
- sparkstreaming直连kafka源码分析(基于spark1.6)
- Spark2.2 TaskScheduler原理剖析与源码分析
- spark2.1源码分析1:Win10下IDEA源码阅读环境的搭建
- spark-streaming-kafka包源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- 基于spark之上的即席分析-spark内存泄漏及源码调优
- Spark内核源码深度剖析:SparkContext原理剖析与源码分析
- Spark SQL之External DataSource外部数据源(二)源码分析
- Spark Core源码分析之RDD基础
- sparkstreaming源码分析
- spark源码分析之Checkpoint的过程
- Spark源码分析之Spark执行环境SparkEnv
- Spark大师之路:广播变量(Broadcast)源码分析
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- Spark中决策树源码分析