您的位置:首页 > 其它

spark core 2.0 MemoryManager

2017-01-18 09:40 169 查看
MemoryManager是一个实施在执行器和存储之间怎么分配内存的内存管理器,它的子类是UnifiedMemoryManager。

在这里,执行内存指在shfuules,joins, sorts 和aggregations中用来计算的内存,而存储内存指的是用来缓存和在集群中传播内部数据。 每个JVM中只有一个MemoryManager。

/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
*
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one MemoryManager per JVM.
*/
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {


以下定义了四种内存Pool。

@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)


以下语句给4种pool设置初始大小,offHeap的两种pool的大小默认是0,因为默认不记用offHeap.

onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)


maxOnheapStorageMemory是一个方法,是没有被执行器占用的内存,数量可能随时间变化。

/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
def maxOnHeapStorageMemory: Long

初始化之后才设置MemoryStore.

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
onHeapStorageMemoryPool.setMemoryStore(store)
offHeapStorageMemoryPool.setMemoryStore(store)
}


BlockManager中设置MemoryStore的代码如下 :

new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
memoryManager.setMemoryStore(memoryStore)


默认是MemoryMode.ON_HEAP.

/**
* Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}


默认页大小:pagesize = maxTungstenMemory / cores / 16,然后再根据最小1M和最大64M修正。

/**
* The default page size, in bytes.
*
* If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
* by looking at the number of cores available to the process, and the total amount of memory,
* and then divide it by a factor of safety.
*/
val pageSizeBytes: Long = {
val minPageSize = 1L * 1024 * 1024   // 1MB
val maxPageSize = 64L * minPageSize  // 64MB
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
val safetyFactor = 16
val maxTungstenMemory: Long = tungstenMemoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
}
val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
}

/**
* Allocates memory for use by Unsafe/Tungsten code.
*/
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
tungstenMemoryMode match {
case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: