您的位置:首页 > 运维架构

spark core 2.0 BlockManager dropFromMemory

2017-01-18 14:57 204 查看
从内存中丢弃一个数据块,如果可以,可能把数据块放到磁盘上。 当内存到达限制并且需要释放空间时调用。

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
*
* The caller of this method must hold a write lock on the block before calling this method.
* This method does not release the write lock.
*
* @return the block's new effective StorageLevel.
*/
private[storage] override def dropFromMemory[T: ClassTag](
blockId: BlockId,
data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
var blockIsUpdated = false
val level = info.level

// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.put(blockId) { fileOutputStream =>
serializerManager.dataSerializeStream(
blockId,
fileOutputStream,
elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
}
blockIsUpdated = true
}

// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}

val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}


/**
* Return the updated storage status of the block with the given ID. More specifically, if
* the block is dropped from memory and possibly added to disk, return the new storage level
* and the updated in-memory and on-disk sizes.
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
info.synchronized {
info.level match {
case null =>
BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem  || onDisk) level.replication else 1
val storageLevel = StorageLevel(
useDisk = onDisk,
useMemory = inMem,
useOffHeap = level.useOffHeap,
deserialized = deserialized,
replication = replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize)
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: