Spark定制班第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
2016-05-29 07:20
453 查看
Spark是粗粒度的,即在默认情况下会预先分配好资源,再进行计算。
好处是资源提前分配好,有计算任务时就直接使用计算资源,不用再考虑资源分配。
不好的地方是,高峰值和低峰值时需要的资源是不一样的。资源如果是针对高峰值情况下考虑的,那势必在低峰值情况下会有大量的资源浪费。
Twitter最近推出了会秒杀Storm的Heron,非常值得关注。因为Heron能有更好的资源分配、 更低的延迟。Heron在语义上兼容了Storm,即原来在Storm上开发的应用程序可以马上在Heron上使用。Storm绝对要成为历史了。Heron的主要开发语言是C++、Java、Python。其API支持Java。
本期内容:
1. Spark Streaming资源动态分配
2. Spark Streaming动态控制消费速率
Spark Core的入口SparkContext:
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
}
已经支持资源的动态分配。
Utils.isDynamicAllocationEnabled:
/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
* incompatible. In environments where dynamic allocation is turned on by default,
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
conf.getInt("spark.executor.instances", 0) == 0
}
ExecutorAllocationManager:
...
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
...
有个时钟,基于时钟的定时器会不断的扫描Executor的情况,每过一段时间去看看资源情况。
Master.schedule:
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
startExecutorsOnWorkers()
}
原先默认的用于分配资源。
ExecutorAllocaionManager:
// Polling loop interval (ms)
private val intervalMillis: Long = 100
...
// A timestamp for each executor of when the executor should be removed, indexed by the ID
// This is set when an executor is no longer running a task, or when it first registers
private val removeTimes = new mutable.HashMap[String, Long]
...
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
...
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
...
removeTimes中有Executor的ID。
executor中有定时器,不断执行schedule。默认周期是intervalMillis(100ms)
ExecutorAllocaionManager.start:
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
ExecutorAllocaionManager.schedule:
/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
}
}
这个内部方法会被周期性的触发执行。
实际生产环境下,动态资源分配可能要自己做好定制。
Spark Streaming的动态调整的复杂之处在于,即使在batch duration内刚做了调整,但可能本batch duration马上就会过期。
你可以考虑改变执行周期(intervalMills),来动态调整。在一个batch duration中要对数据分片,可以算一下已拥有闲置的core,如果不够,则可以申请增加Executor,从而把任务分配到新增的Executor。
也可以考量针对上一个batch duration的资源需求情况,因为峰值出现时,往往会延续在多个连续的batch duration中。考量上一个batch
duration的情况,用某种算法来动态调整后续的batch duration的资源。修改Spark Streaming可以设计StreamingContext的新子类。
其实前面的动态资源分配的定制方式做起来不容易,可能仍不太合适。
Spark Streaming中已经有自动的动态控制数据流进来的速度的方式。配置项是spark.streaming.backpressure.enabled。详见论文dynamic_batching.pdf。
强烈建议设为true。
好处是资源提前分配好,有计算任务时就直接使用计算资源,不用再考虑资源分配。
不好的地方是,高峰值和低峰值时需要的资源是不一样的。资源如果是针对高峰值情况下考虑的,那势必在低峰值情况下会有大量的资源浪费。
Twitter最近推出了会秒杀Storm的Heron,非常值得关注。因为Heron能有更好的资源分配、 更低的延迟。Heron在语义上兼容了Storm,即原来在Storm上开发的应用程序可以马上在Heron上使用。Storm绝对要成为历史了。Heron的主要开发语言是C++、Java、Python。其API支持Java。
本期内容:
1. Spark Streaming资源动态分配
2. Spark Streaming动态控制消费速率
Spark Core的入口SparkContext:
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
}
已经支持资源的动态分配。
Utils.isDynamicAllocationEnabled:
/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
* incompatible. In environments where dynamic allocation is turned on by default,
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
conf.getInt("spark.executor.instances", 0) == 0
}
ExecutorAllocationManager:
...
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
...
有个时钟,基于时钟的定时器会不断的扫描Executor的情况,每过一段时间去看看资源情况。
Master.schedule:
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
startExecutorsOnWorkers()
}
原先默认的用于分配资源。
ExecutorAllocaionManager:
// Polling loop interval (ms)
private val intervalMillis: Long = 100
...
// A timestamp for each executor of when the executor should be removed, indexed by the ID
// This is set when an executor is no longer running a task, or when it first registers
private val removeTimes = new mutable.HashMap[String, Long]
...
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
...
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
...
removeTimes中有Executor的ID。
executor中有定时器,不断执行schedule。默认周期是intervalMillis(100ms)
ExecutorAllocaionManager.start:
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
ExecutorAllocaionManager.schedule:
/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
}
}
这个内部方法会被周期性的触发执行。
实际生产环境下,动态资源分配可能要自己做好定制。
Spark Streaming的动态调整的复杂之处在于,即使在batch duration内刚做了调整,但可能本batch duration马上就会过期。
你可以考虑改变执行周期(intervalMills),来动态调整。在一个batch duration中要对数据分片,可以算一下已拥有闲置的core,如果不够,则可以申请增加Executor,从而把任务分配到新增的Executor。
也可以考量针对上一个batch duration的资源需求情况,因为峰值出现时,往往会延续在多个连续的batch duration中。考量上一个batch
duration的情况,用某种算法来动态调整后续的batch duration的资源。修改Spark Streaming可以设计StreamingContext的新子类。
其实前面的动态资源分配的定制方式做起来不容易,可能仍不太合适。
Spark Streaming中已经有自动的动态控制数据流进来的速度的方式。配置项是spark.streaming.backpressure.enabled。详见论文dynamic_batching.pdf。
强烈建议设为true。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 架构纵横谈之二 ---- 架构的模式与要点
- 用ASP编写的加密和解密类
- 解密网页加密的两个方法
- BS项目中的CSS架构_仅加载自己需要的CSS
- VBS脚本加密/解密VBS脚本(简易免杀版1.1)
- 浅析Ruby的源代码布局及其编程风格
- 关于三种主流WEB架构的思考
- C#编写DES加密、解密类
- C#实现对文件进行加密解密的方法
- C#实现数据包加密与解密实例详解
- C#最简单的字符串加密解密方法
- Android操作系统的架构设计分析
- 基于C#对用户密码使用MD5加密与解密
- PHP加密解密字符串汇总