2.2 JobGenerator 详解
2016-05-20 11:11
375 查看
转自:https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/2.2%20JobGenerator%20%E8%AF%A6%E8%A7%A3.md
[酷玩 Spark] Spark Streaming 源码解析系列 ,返回目录请 猛戳这里
「腾讯·广点通」技术团队荣誉出品
本文内容适用范围:
2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式发布)
2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
阅读本文前,请一定先阅读 Spark
Streaming 实现思路与模块概述 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对
前面在 Spark
Streaming 实现思路与模块概述 和 DStream
生成 RDD 实例详解 里我们分析了
的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。
在 Spark Streaming 程序的入口,我们都会定义一个
RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是
batch 的 RDD DAG 具体生成工作委托给
本文我们来详解
在用户 code 最后调用
具体的看,
可以看到,在启动了 RPC 处理线程
后面我们会分析失效后重启的
可以看到,这里首次启动时做的工作,先是通过
1 个 batch 的启动时间,然后就是
当定时器
通过之前几篇文章的分析我们知道,
batch 生成 RDD DAG 的实例。
具体的,这个定时器实例就是:
通过代码也可以看到,整个
—— 该为当前 batch (
接下来,
这段代码异常精悍,包含了
(1) 要求
batch 里
这里
meta 信息的过程,是相互独立的,但通过
即是说,不管
batch,还是由
batch 的块数据 meta,将被划分入最新的 batch
所以,每个块数据的 meta 信息,将被划入一个、且只被划入一个 batch
(2) 要求
RDD 实例,并递归的调用尾
RDD DAG 的实例
这个过程的详解,请参考前面的文章 DStream
生成 RDD 实例详解
精确的说,整个
(3) 获取第 1 步
第 1 步中
batch 的块数据 meta 信息
(4) 将第 2 步生成的本 batch 的 RDD DAG,和第 3 步获取到的 meta 信息,一同提交给
这里我们提交的是将 (a)
这里的向
(5) 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个 checkpoint
这里做 checkpoint 也只是异步提交一个
这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的 JobSet 等实际运行时信息。
JobGenerator 详解
[酷玩 Spark] Spark Streaming 源码解析系列 ,返回目录请 猛戳这里「腾讯·广点通」技术团队荣誉出品
本文内容适用范围:
2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式发布)
2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
阅读本文前,请一定先阅读 Spark
Streaming 实现思路与模块概述 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对
模块 2:Job 动态生成细节的解释。
引言
前面在 SparkStreaming 实现思路与模块概述 和 DStream
生成 RDD 实例详解 里我们分析了
DStreamGraph和
DStream具有能够实例化
RDD和
RDDDAG
的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。
在 Spark Streaming 程序的入口,我们都会定义一个
batchDuration,就是需要每隔多长时间就比照静态的
DStreamGraph来动态生成一个
RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是
JobScheduler,
JobScheduler有两个非常重要的成员:
JobGenerator和
ReceiverTracker。
JobScheduler将每个
batch 的 RDD DAG 具体生成工作委托给
JobGenerator,而将源头输入数据的记录工作委托给
ReceiverTracker。
JobScheduler 的全限定名是:org.apache.spark.streaming.scheduler.JobScheduler JobGenerator 的全限定名是:org.apache.spark.streaming.scheduler.JobGenerator ReceiverTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceiverTracker
本文我们来详解
JobScheduler。
JobGenerator
启动
在用户 code 最后调用 ssc.start()时,将隐含的导致一系列模块的启动,其中对我们
JobGenerator这里的启动调用关系如下:
// 来自 StreamingContext.start(), JobScheduler.start(), JobGenerator.start() ssc.start() // 【用户 code:StreamingContext.start()】 -> scheduler.start() // 【JobScheduler.start()】 -> jobGenerator.start() // 【JobGenerator.start()】
具体的看,
JobGenerator.start()的代码如下:
// 来自 JobGenerator.start() def start(): Unit = synchronized { ... eventLoop.start() // 【启动 RPC 处理线程】 if (ssc.isCheckpointPresent) { restart() // 【如果不是第一次启动,就需要从 checkpoint 恢复】 } else { startFirstTime() // 【第一次启动,就 startFirstTime()】 } }
可以看到,在启动了 RPC 处理线程
eventLoop后,就会根据是否是第一次启动,也就是是否存在 checkpoint,来具体的决定是
restart()还是
startFirstTime()。
后面我们会分析失效后重启的
restart()流程,这里我们来关注
startFirstTime():
// 来自 JobGenerator.startFirstTime() private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
可以看到,这里首次启动时做的工作,先是通过
graph.start()来告知了
DStreamGraph第
1 个 batch 的启动时间,然后就是
timer.start()启动了关键的定时器。
当定时器
timer启动以后,
JobGenerator的
startFirstTime()就完成了。
RecurringTimer
通过之前几篇文章的分析我们知道,JobGenerator维护了一个定时器,周期就是用户设置的
batchDuration,定时为每个
batch 生成 RDD DAG 的实例。
具体的,这个定时器实例就是:
// 来自 JobGenerator private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { ... private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") ... }
通过代码也可以看到,整个
timer的调度周期就是
batchDuration,每次调度起来就是做一个非常简单的工作:往
eventLoop里发送一个消息
—— 该为当前 batch (
new Time(longTime)) GenerateJobs 了!
GenerateJobs
接下来,eventLoop收到消息时,会在一个消息处理的线程池里,执行对应的操作。在这里,处理
GenerateJobs(time)消息的对应操作是
generateJobs(time):
private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // 【步骤 (1)】 graph.generateJobs(time) // 【步骤 (2)】 } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) // 【步骤 (3)】 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) // 【步骤 (4)】 case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) // 【步骤 (5)】 }
这段代码异常精悍,包含了
JobGenerator主要工作 —— 如下图所示 —— 的 5 个步骤!
(1) 要求
ReceiverTracker将目前已收到的数据进行一次 allocate,即将上次 batch 切分后的数据切分到到本次新的
batch 里
这里
ReceiverTracker对已收到数据的 meta 信息进行
allocateBlocksToBatch(time),与
ReceiverTracker自己接收
ReceiverSupervisorImpl上报块数据
meta 信息的过程,是相互独立的,但通过
synchronized关键字来互斥同步
即是说,不管
ReceiverSupervisorImpl形成块数据的时间戳
t1、
ReceiverSupervisorImpl发送块数据的时间戳
t2、
ReceiverTracker收到块数据的时间戳
t3分别是啥,最终块数据划入哪个
batch,还是由
ReceiverTracker.allocateBlocksToBatch(time)方法获得
synchronized锁的那一刻,还有未划入之前任何一个
batch 的块数据 meta,将被划分入最新的 batch
所以,每个块数据的 meta 信息,将被划入一个、且只被划入一个 batch
(2) 要求
DStreamGraph复制出一套新的 RDD DAG 的实例,具体过程是:
DStreamGraph将要求图里的尾
DStream节点生成具体的
RDD 实例,并递归的调用尾
DStream的上游
DStream节点……以此遍历整个
DStreamGraph,遍历结束也就正好生成了
RDD DAG 的实例
这个过程的详解,请参考前面的文章 DStream
生成 RDD 实例详解
精确的说,整个
DStreamGraph.generateJobs(time)遍历结束的返回值是
Seq[Job]
(3) 获取第 1 步
ReceiverTracker分配到本 batch 的源头数据的 meta 信息
第 1 步中
ReceiverTracker只是对 batch 的源头数据 meta 信息进行了 batch 的分配,本步骤是按照 batch 时间来向
ReceiverTracker查询得到划分到本
batch 的块数据 meta 信息
(4) 将第 2 步生成的本 batch 的 RDD DAG,和第 3 步获取到的 meta 信息,一同提交给
JobScheduler异步执行
这里我们提交的是将 (a)
time(b)
Seq[job](c)
块数据的 meta 信息这三者包装为一个
JobSet,然后调用
JobScheduler.submitJobSet(JobSet)提交给
JobScheduler
这里的向
JobScheduler提交过程与
JobScheduler接下来在
jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回
(5) 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个 checkpoint
这里做 checkpoint 也只是异步提交一个
DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回
这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的 JobSet 等实际运行时信息。
相关文章推荐
- 网站出现菜刀连接漏洞
- 批量修改数据库表前缀
- bash:fdisk:command not found
- [J2SE]s03e07.JDBC增删改查操作(练习)
- Linux下Rsync+sersync实现数据实时同步
- 与域内时间服务器同步时间
- springmvc web.xml 配置默认的controller访问问题
- VS中使用CXTPDockingPane出现的编译错误 error C2059: syntax error : 'constant' 解决
- 滴滴出行(滴滴打车)太耗电太占内存cpu问题之终极解决
- 架构师之路(2)---详解面向过程
- 安全攻城狮研发技能栈V1.0,附详细点评~
- 【WPF】WPF 布局
- magento 的一些关于addFieldToFilter的查询
- 实现liunx之间无密码访问——ssh密匙
- Unknown Treasure---hdu5446(卢卡斯+中国剩余定理)
- [C#] .NET4.0中使用4.5中的 async/await 功能实现异
- Huffman编码文件压缩 - Huffman树的建立与编码
- CodeForces 509B Painting Pebbles
- C++默认参数注意事项
- C++第六次实验