Spark调优-参数及配置
2018-01-10 23:51
483 查看
参数调优
1 资源类
11 num-executors
12 executor-memory
13 executor-cores
14 driver-memory
15 sparkdefaultparallelism
16 sparkstoragememoryFraction
17 sparkshufflememoryFraction
getMaxExecutionMemory
getMaxStorageMemory
spark-submit时若带了–num-executors参数则取此值, 不带时取自spark.executor.instances配置,若没配置则取环境变量SPARK_EXECUTOR_INSTANCES的值,若其未设置,则取默认值DEFAULT_NUMBER_EXECUTORS=2。
这个值得设置还是要看分配的队列的资源情况,太少了无法充分利用集群资源,太多了则难以分配需要的资源。
通常可将此值设置为num-executors*executor-cores的2-3倍为宜,如果与其相近的话,则对于先完成task的core则无任务可干。2-3倍数量关系的话即不至于太零散,又可是的任务执行更均衡。
参见spark.shuffle.storageFraction,同理如果RDD持久化操作少而shuffle操作多则赢提高shuffle操作的内存占比,避免shuffle执行过程中内存不够。
一些实现细节
1 资源类
11 num-executors
12 executor-memory
13 executor-cores
14 driver-memory
15 sparkdefaultparallelism
16 sparkstoragememoryFraction
17 sparkshufflememoryFraction
getMaxExecutionMemory
getMaxStorageMemory
1.参数调优
1.1 资源类
1.1.1 num-executors
该参数主要用于设置该应用总共需要多少executors来执行,Driver在向集群资源管理器申请资源时需要根据此参数决定分配的Executor个数,并尽量满足所需。在不带的情况下只会分配少量Executor。spark-submit时若带了–num-executors参数则取此值, 不带时取自spark.executor.instances配置,若没配置则取环境变量SPARK_EXECUTOR_INSTANCES的值,若其未设置,则取默认值DEFAULT_NUMBER_EXECUTORS=2。
/** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. * If not using dynamic allocation it gets the number of executors requested by the user. */ def getInitialTargetExecutorNumber( conf: SparkConf, //DEFAULT_NUMBER_EXECUTORS 值为2 numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { //动态分配场景下没细看 val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number " + s"$minNumExecutors and max executor number $maxNumExecutors") initialNumExecutors } else { val targetNumExecutors = sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) // System property can override environment variable. 配置优先级高于环境变量,环境变量不存在时才用numExecutors即DEFAULT_NUMBER_EXECUTORS=2 conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors) } }
这个值得设置还是要看分配的队列的资源情况,太少了无法充分利用集群资源,太多了则难以分配需要的资源。
1.1.2 executor-memory
设置每个executor的内存,对Spark作业运行的性能影响很大。一般4-8G就差不多了,当然还要看资源队列的情况。num-executor*executor-memory的大小绝不能超过队列的内存总大小。1.1.3 executor-cores
设置每个executor的cpu核数,其决定了每个executor并行执行task的能力。Executor的CPU core数量设置为2-4个即可。弹药注意,num-executor*executor-cores也不能超过分配队列中cpu核数的大小。具体的核数的设置需要根据分配队列中资源统筹考虑,取得Executor,核数,及任务数的平衡。对于多任务共享的队列,更要注意不能将资源占满。1.1.4 driver-memory
运行sparkContext的Driver所在所占用的内存,通常不必设置,设置的话1G就足够了,除非是需要使用collect之类算子经常需要将数据提取到driver中的情况。1.1.5 spark.default.parallelism
此参数用于设置每个stage经TaskScheduler进行调度时生成task的数量,此参数未设置时将会根据读到的RDD的分区生成task,即根据源数据在hdfs中的分区数确定,若此分区数较小,则处理时只有少量task在处理,前述分配的executor中的core大部分无任务可干。通常可将此值设置为num-executors*executor-cores的2-3倍为宜,如果与其相近的话,则对于先完成task的core则无任务可干。2-3倍数量关系的话即不至于太零散,又可是的任务执行更均衡。
1.1.6 spark.storage.memoryFraction
该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。即Executor中可以用来保存持久化的RDD数据默认占了60%。根据Spark应用中RDD持久化操作的多少可以灵活调整这个比例,即持久化需要的内存多久放大此比例,免得内存不够用而不能持久化或转存到磁盘中,如果持久化操作少儿shuffle的操作多则可降低这个值。
1.1.7 spark.shuffle.memoryFraction
该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时 就会极大地降低性能。参见spark.shuffle.storageFraction,同理如果RDD持久化操作少而shuffle操作多则赢提高shuffle操作的内存占比,避免shuffle执行过程中内存不够。
一些实现细节
getMaxExecutionMemory
得到Executor中可用执行内存,计算为(systemMaxMemory * memoryFraction * safetyFraction).toLong,其中
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
getMaxStorageMemory
最大能存储的内存也就总内存的0.6*0.9=54%/** * Return the total amount of memory available for the storage region, in bytes. */ private def getMaxStorageMemory(conf: SparkConf): Long = { val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (systemMaxMemory * memoryFraction * safetyFraction).toLong }
相关文章推荐
- spark参数配置调优
- Spark配置参数调优
- 大数据Spark “蘑菇云”行动第99课:Hive性能调优之企业级Mapper和Reducer调优深度细节解密 参数配置
- Spark配置参数调优
- spark 资源参数调优
- JVM参数调优(虚拟机参数配置)
- Spark性能相关参数配置
- jvm调优总结---jvm参数配置
- Spark性能相关参数配置
- HDP 2.2 ( Hadoop 2.6 ) 集群的内存参数配置和参数调优 (Yarn/MapReduce2)
- Spark参数有效配置
- Hadoop 学习研究(四):MapReduce shuffle过程剖详解及参数配置调优
- spark参数配置
- Spark性能优化:JVM参数调优
- Hive参数配置调优
- spark参数调优
- Hadoop与Spark常用配置参数总结
- spark submit参数及调优,任务提交脚本
- Hadoop与Spark常用配置参数总结
- Spark资源参数调优参数