spark.streaming.concurrentJobs参数分析
2016-04-14 16:30
357 查看
最近,在spark streaming 调优时,发现个增加job并行度的参数
使用concurrentJobs参数初始化jobExecutor线程池,也就是这个参数直接影响了job executor线程池中的线程数目。
用来保存不同的时间点生成的JobSet,而JobSet中包含多个Job;
JobSet submit逻辑:
不难看出jobExecutor的容量决定了池子中同时可以被处理的JobHandler线程数,JobHandler是job的执行线程,因此决定了可以被同时被提交的Job数目。
- spark-default中修改
全局性修改,所有的streaming job都会受到影响。
- 提交streaming job是 –conf 参数添加(推荐)
在提交job时,可以使用–conf 参数为该job添加个性化的配置。例如:
设置该streaming job的job executor 线程池大小为5,在资源充足的情况下可以同时执行5个batch job。
- 代码设置
在代码中通过sparkConf设置:
或者
当调整为公平调度时,job可以共享计算资源,而job的提交仍然是有时间顺序的(虽然时间间隔很小),容易造成task在executor间分配的倾斜,拉长job的整体执行时间。
当使用fifo调度方式,先到的job优先获得计算资源,当executor数目不足时,job会等待executor被释放,task数目反而不易倾斜。
在实际使用时,如果executor数目足够,建议使用FIFO模式,如在concurrentJob为默认配置时,executor分配数目为m,则当concurrentJobs配置为n时,executor建议分配为 n*m。
spark.streaming.concurrentJobs,spark 默认值为1,当增加为2时(在spark-default中配置),如遇到处理速度慢 streaming application UI 中会有两个Active Jobs(默认值时为1),也就是在同一时刻可以执行两个批次的streaming job,下文分析这个参数是如何影响streaming 的执行的。
参数引入
在spark streaming 的JobScheduler line 47,读取了该参数:private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
使用concurrentJobs参数初始化jobExecutor线程池,也就是这个参数直接影响了job executor线程池中的线程数目。
job executor
job executor 线程池用来execute JobHandler线程;在jobSchedule中有个job容器jobSets:private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
用来保存不同的时间点生成的JobSet,而JobSet中包含多个Job;
JobSet submit逻辑:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
不难看出jobExecutor的容量决定了池子中同时可以被处理的JobHandler线程数,JobHandler是job的执行线程,因此决定了可以被同时被提交的Job数目。
使用方法
可以通过集中方法为streaming job配置此参数。- spark-default中修改
全局性修改,所有的streaming job都会受到影响。
- 提交streaming job是 –conf 参数添加(推荐)
在提交job时,可以使用–conf 参数为该job添加个性化的配置。例如:
bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5
设置该streaming job的job executor 线程池大小为5,在资源充足的情况下可以同时执行5个batch job。
- 代码设置
在代码中通过sparkConf设置:
sparkConf.set("spark.streaming.concurrentJobs", "5");
或者
System.setProperty("spark.streaming.concurrentJobs", "5");
scheduler mode的使用建议
在配置多个concurrentJob时,多个批次job被同时提交到集群中,也就需要更多的计算资源;当没有更多的计算资源(Executor)被分配个该streaming job时,可将schedul 调整为FAIR(公平调度)来达到被提交的多个job可公平的共享计算资源。当调整为公平调度时,job可以共享计算资源,而job的提交仍然是有时间顺序的(虽然时间间隔很小),容易造成task在executor间分配的倾斜,拉长job的整体执行时间。
当使用fifo调度方式,先到的job优先获得计算资源,当executor数目不足时,job会等待executor被释放,task数目反而不易倾斜。
在实际使用时,如果executor数目足够,建议使用FIFO模式,如在concurrentJob为默认配置时,executor分配数目为m,则当concurrentJobs配置为n时,executor建议分配为 n*m。
相关文章推荐
- 添加删除用户
- Asp.net Session保存到Redis: 使用 RedisSessionStateProvider
- Angluar中Controller之间通信方法
- 8种交换
- vb14
- 范范(10)
- 二叉排序树(BST)创建,删除,查找操作
- 详解JSP中的语句对象Statement操作MySQL的使用实例
- mysql登录报错提示:ERROR 1045 (28000)的解决方法
- 通过Android源代码分析startActivity()过程(下)
- 使用virtualbox设置双网卡,桥接+内部网络
- C++中map、set、hash_map、hash_set、unordered_map、unordered_set通俗辨析
- 利用反射取得类中属性(引用及非引用数据类型)及设置该类实例化对象的属性值
- Ajax例子,views返回,html接收数据
- HTML5 五子棋 - JS/Canvas 游戏
- 树结点的简单练习
- 二叉排序树(BST)创建,删除,查找操作
- URL链接中文参数乱码的若干处理方法
- tesseract的编译安装
- 静态变量static