spark core 2.0 MemoryManager
2017-01-18 09:40
169 查看
MemoryManager是一个实施在执行器和存储之间怎么分配内存的内存管理器,它的子类是UnifiedMemoryManager。
在这里,执行内存指在shfuules,joins, sorts 和aggregations中用来计算的内存,而存储内存指的是用来缓存和在集群中传播内部数据。 每个JVM中只有一个MemoryManager。
以下定义了四种内存Pool。
以下语句给4种pool设置初始大小,offHeap的两种pool的大小默认是0,因为默认不记用offHeap.
maxOnheapStorageMemory是一个方法,是没有被执行器占用的内存,数量可能随时间变化。
初始化之后才设置MemoryStore.
BlockManager中设置MemoryStore的代码如下 :
默认是MemoryMode.ON_HEAP.
默认页大小:pagesize = maxTungstenMemory / cores / 16,然后再根据最小1M和最大64M修正。
在这里,执行内存指在shfuules,joins, sorts 和aggregations中用来计算的内存,而存储内存指的是用来缓存和在集群中传播内部数据。 每个JVM中只有一个MemoryManager。
/** * An abstract memory manager that enforces how memory is shared between execution and storage. * * In this context, execution memory refers to that used for computation in shuffles, joins, * sorts and aggregations, while storage memory refers to that used for caching and propagating * internal data across the cluster. There exists one MemoryManager per JVM. */ private[spark] abstract class MemoryManager( conf: SparkConf, numCores: Int, onHeapStorageMemory: Long, onHeapExecutionMemory: Long) extends Logging {
以下定义了四种内存Pool。
@GuardedBy("this") protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) @GuardedBy("this") protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
以下语句给4种pool设置初始大小,offHeap的两种pool的大小默认是0,因为默认不记用offHeap.
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) protected[this] val offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
maxOnheapStorageMemory是一个方法,是没有被执行器占用的内存,数量可能随时间变化。
/** * Total available memory for storage, in bytes. This amount can vary over time, depending on * the MemoryManager implementation. * In this model, this is equivalent to the amount of memory not occupied by execution. */ def maxOnHeapStorageMemory: Long
初始化之后才设置MemoryStore.
/** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ final def setMemoryStore(store: MemoryStore): Unit = synchronized { onHeapStorageMemoryPool.setMemoryStore(store) offHeapStorageMemoryPool.setMemoryStore(store) }
BlockManager中设置MemoryStore的代码如下 :
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) private[spark] val diskStore = new DiskStore(conf, diskBlockManager) memoryManager.setMemoryStore(memoryStore)
默认是MemoryMode.ON_HEAP.
/** * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") require(Platform.unaligned(), "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.") MemoryMode.OFF_HEAP } else { MemoryMode.ON_HEAP } }
默认页大小:pagesize = maxTungstenMemory / cores / 16,然后再根据最小1M和最大64M修正。
/** * The default page size, in bytes. * * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value * by looking at the number of cores available to the process, and the total amount of memory, * and then divide it by a factor of safety. */ val pageSizeBytes: Long = { val minPageSize = 1L * 1024 * 1024 // 1MB val maxPageSize = 64L * minPageSize // 64MB val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors() // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case val safetyFactor = 16 val maxTungstenMemory: Long = tungstenMemoryMode match { case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize } val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor) val default = math.min(maxPageSize, math.max(minPageSize, size)) conf.getSizeAsBytes("spark.buffer.pageSize", default) }
/** * Allocates memory for use by Unsafe/Tungsten code. */ private[memory] final val tungstenMemoryAllocator: MemoryAllocator = { tungstenMemoryMode match { case MemoryMode.ON_HEAP => MemoryAllocator.HEAP case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE } }
相关文章推荐
- spark core 2.0 SortShuffleManager
- spark core 2.0 Executor Heartbeat
- spark core 2.0 DiskBlockObjectWriter
- spark core 2.0 MemoryLocation
- spark core 2.0 MetricsConfig
- spark core 2.0 MemoryBlock Soruce Code Analysis
- spark core 2.0 ChunkedByteBufferOutputStream
- spark core 2.0 TimeTrackingOutputStream
- spark core 2.0 TaskSchedulerImpl 源代码解析
- spark core 2.0 RedirectableOutputStream
- spark core 2.0 StorageMemoryPool
- spark core 2.0 LongArray
- spark core 2.0 PartitionCoalescer, PartitionGroup, DefaultPartitionCoalescer
- spark core 2.0 ExecutionMemoryPool
- spark core 2.0 BlockManager dropFromMemory
- spark core 2.0 BypassMergeSortShuffleWriter
- spark core 2.0 Executor
- spark core 2.0 UnifiedMemoryManager
- spark core 2.0 CheckpointState RDDCheckpointData Checkpoint LocalRDDCheckpointData
- spark core 2.0 CoarseGrainedSchedulerBackend SchedulerBackend ExecutorAllocationClient 源代码解析