spark core 2.0 BlockManager dropFromMemory
2017-01-18 14:57
204 查看
从内存中丢弃一个数据块,如果可以,可能把数据块放到磁盘上。 当内存到达限制并且需要释放空间时调用。
/** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * * If `data` is not put on disk, it won't be created. * * The caller of this method must hold a write lock on the block before calling this method. * This method does not release the write lock. * * @return the block's new effective StorageLevel. */ private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream( blockId, fileOutputStream, elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) } blockIsUpdated = true } // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { reportBlockStatus(blockId, status, droppedMemorySize) } if (blockIsUpdated) { addUpdatedBlockStatusToTaskMetrics(blockId, status) } status.storageLevel }
/** * Return the updated storage status of the block with the given ID. More specifically, if * the block is dropped from memory and possibly added to disk, return the new storage level * and the updated in-memory and on-disk sizes. */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { info.synchronized { info.level match { case null => BlockStatus.empty case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || onDisk) level.replication else 1 val storageLevel = StorageLevel( useDisk = onDisk, useMemory = inMem, useOffHeap = level.useOffHeap, deserialized = deserialized, replication = replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize) } } }
相关文章推荐
- spark core 2.0 ChunkedByteBufferOutputStream
- spark core 2.0 TimeTrackingOutputStream
- spark core 2.0 TaskSchedulerImpl 源代码解析
- spark core 2.0 RedirectableOutputStream
- spark core 2.0 MemoryManager
- spark core 2.0 StorageMemoryPool
- spark core 2.0 LongArray
- spark core 2.0 PartitionCoalescer, PartitionGroup, DefaultPartitionCoalescer
- spark core 2.0 ExecutionMemoryPool
- spark core 2.0 BypassMergeSortShuffleWriter
- spark core 2.0 Executor
- spark core 2.0 UnifiedMemoryManager
- spark core 2.0 CheckpointState RDDCheckpointData Checkpoint LocalRDDCheckpointData
- spark core 2.0 CoarseGrainedSchedulerBackend SchedulerBackend ExecutorAllocationClient 源代码解析
- spark core 2.0 Partition and HadoopPartition
- spark core 2.0 SortShuffleManager
- spark core 2.0 Executor Heartbeat
- spark core 2.0 DiskBlockObjectWriter
- spark core 2.0 MemoryLocation
- spark core 2.0 MetricsConfig