SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26
423 查看
一直很好奇Sparkstreaming的ssc.start是怎么做到不停的一直定时循环处理数据的, 看了一下源码, 大致明白了整个过程, 记录分享一下。
入口为StreamingContext的start方法:
在构造StreamingContext的时候 state就初始化为INITIALIZED , 并且定义了一个JobScheduler scheduler
代码里面很明白, 在初始化的时候, 执行了JobScheduler的start方法。
那么我们看一下JobScheduler的start方法里面都做了什么:
1.启动定义了一个eventLoop
2.启动定义了一个ReceiverTracker
3.启动定义了一个jobGenerator
eventloop主要来处理Job相关的event:
JobStarted(job, startTime) => handleJobStart(job, startTime)
JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
ErrorReported(m, e) => handleError(m, e)
然后ReceiverTracker 主要用来启动receiver和接收数据的。 回头有空了再详细了解一下receiver是怎么工作的, 到时候再写一篇文章
JobScheduler里面最重要的就是JobGenerator的start方法,
这里面也做了两个主要的事:
1.定义了一个新的eventLoop 用来处理以下情况:
GenerateJobs(time) => generateJobs(time)
ClearMetadata(time) => clearMetadata(time)
DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater)
ClearCheckpointData(time) => clearCheckpointData(time)
2.startFirstTime 方法就是不停的做loop, 每次到我们设定的Duration的时候 再submitJob
先看一下#1:
其实在构造JobGenerator的时候, 我们已经构造了一个timer, 这个timer就是用来call back GenerateJobs(time)的: eventLoop.post
在#1定义完eventloop后, 正常情况下 第一次就会执行startFirstTime ,
在这么方法里面就会去执行 timer.start动作, start的动作里面主要就是去启动一个thread:
这个thread定义就是:
他的run里面就跑了loop方法, 一看方法名字, 我们就知道StreamingContext能循环submitJob 的功能估计就在这个方法里面了, 我们看一下代码, 他干了什么:
代码很简单, 主要功能就是clock.waitTillTime(nextTime) 确认一个Duration到了, 然后执行callback(nextTime) 方法, 再设定nexttime到下一个Duration。
关键就是这个callback做的是什么。 从上面的代码我们可以看出来 具体执行的是在timer里面做的, 在我们构造timer的时候我们已经构造了callback这个方法, 具体就是在JobGenerator里面的:
这个callback其实就是eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
那么GenerateJobs()做的又是什么呢? 网上翻, 可以看到我们有一个eventloop里面定义了GenerateJobs(time)去执行generateJobs()方法, 这个方法里面又做了什么呢? 还是看代码:
可以看到他主要就是做submitJobSet这个动作, 那么这个方法里面又干了些什么呢?
好啦, 越来越明白了, 这里面就是去执行Job, 在JobHandler 里面还有job.run
可能有点绕, 但是也基本明白了StreamingContext是怎么工作的, 从初始化, 循环, 再到submitJob, 整个过程大概就是这个样子。
接下来马上要花一些时间玩一下SparkSQL, 到时候看看能不能写几篇他的文章出来, 最后再啃ML这个硬骨头 打算再花一个多月时间弄懂SparkSQL和ML后, 好好玩一下hadoop的生态系统, 目前只对MR大致会用, 不知道内部是怎么运行的, 有机会好好看一下
入口为StreamingContext的start方法:
在构造StreamingContext的时候 state就初始化为INITIALIZED , 并且定义了一个JobScheduler scheduler
代码里面很明白, 在初始化的时候, 执行了JobScheduler的start方法。
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
那么我们看一下JobScheduler的start方法里面都做了什么:
1.启动定义了一个eventLoop
2.启动定义了一个ReceiverTracker
3.启动定义了一个jobGenerator
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
eventloop主要来处理Job相关的event:
JobStarted(job, startTime) => handleJobStart(job, startTime)
JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
ErrorReported(m, e) => handleError(m, e)
然后ReceiverTracker 主要用来启动receiver和接收数据的。 回头有空了再详细了解一下receiver是怎么工作的, 到时候再写一篇文章
JobScheduler里面最重要的就是JobGenerator的start方法,
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
这里面也做了两个主要的事:
1.定义了一个新的eventLoop 用来处理以下情况:
GenerateJobs(time) => generateJobs(time)
ClearMetadata(time) => clearMetadata(time)
DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater)
ClearCheckpointData(time) => clearCheckpointData(time)
2.startFirstTime 方法就是不停的做loop, 每次到我们设定的Duration的时候 再submitJob
先看一下#1:
其实在构造JobGenerator的时候, 我们已经构造了一个timer, 这个timer就是用来call back GenerateJobs(time)的: eventLoop.post
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
在#1定义完eventloop后, 正常情况下 第一次就会执行startFirstTime ,
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
在这么方法里面就会去执行 timer.start动作, start的动作里面主要就是去启动一个thread:
def start(startTime: Long): Long = synchronized {
nextTime = startTime
thread.start()
logInfo("Started timer for " + name + " at time " + nextTime)
nextTime
}
这个thread定义就是:
private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
override def run() { loop }
}
他的run里面就跑了loop方法, 一看方法名字, 我们就知道StreamingContext能循环submitJob 的功能估计就在这个方法里面了, 我们看一下代码, 他干了什么:
private def loop() {
try {
while (!stopped) {
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
4000 }
}
}
private def triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
代码很简单, 主要功能就是clock.waitTillTime(nextTime) 确认一个Duration到了, 然后执行callback(nextTime) 方法, 再设定nexttime到下一个Duration。
关键就是这个callback做的是什么。 从上面的代码我们可以看出来 具体执行的是在timer里面做的, 在我们构造timer的时候我们已经构造了callback这个方法, 具体就是在JobGenerator里面的:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
这个callback其实就是eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
那么GenerateJobs()做的又是什么呢? 网上翻, 可以看到我们有一个eventloop里面定义了GenerateJobs(time)去执行generateJobs()方法, 这个方法里面又做了什么呢? 还是看代码:
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
可以看到他主要就是做submitJobSet这个动作, 那么这个方法里面又干了些什么呢?
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
好啦, 越来越明白了, 这里面就是去执行Job, 在JobHandler 里面还有job.run
可能有点绕, 但是也基本明白了StreamingContext是怎么工作的, 从初始化, 循环, 再到submitJob, 整个过程大概就是这个样子。
接下来马上要花一些时间玩一下SparkSQL, 到时候看看能不能写几篇他的文章出来, 最后再啃ML这个硬骨头 打算再花一个多月时间弄懂SparkSQL和ML后, 好好玩一下hadoop的生态系统, 目前只对MR大致会用, 不知道内部是怎么运行的, 有机会好好看一下
相关文章推荐
- 如何将spark streaming处理结果保存到关系型数据库中
- Sparkstreaming是如何获取数据组成Dstream的源码浅析
- SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
- 第116课: Spark Streaming性能优化:如何在毫秒内处理处理大吞吐量的和数据波动比较大 的程序
- 如何将spark streaming处理结果保存到关系型数据库中
- Spark视频第6期:无需等待的实时计算时代如何在90分钟内通过Spark Streaming掌握大数据实时计算和流处理?
- 大数据IMF传奇行动绝密课程第116课:Spark Streaming性能优化:如何在毫秒内处理大吞吐量和数据波动比较大的流计算
- Cocos2d-x3.2游戏的核心循环在Application,如何处理FPS不稳------沈大海
- 大数据分析处理框架——离线分析(hive,pig,spark)、近似实时分析(Impala)和实时分析(storm、spark streaming)
- 如何处理用代码创建SD Sales order时遇到的错误消息KI 180
- 第108课: Spark Streaming电商广告点击综合案例动态黑名单过滤真正的实现代码
- Java多线程调试如何完成信息输出处理
- 哈,我自己翻译的小书,马上就完成了,是讲用python处理大数据框架hadoop,spark的
- 如何管理Spark Streaming消费Kafka的偏移量(二)
- 电商系统如何调用支付宝接口代码,以及支付宝如何返回处理结果。
- 以WebBrowser.DocumentCompleted 事件为例,说明用C#如何完成事件的订阅处理。
- Spark学习笔记 --- SparkStreaming 实现对 TCP 数据源处理
- Spark Streaming中空batches处理的两种方法(转)
- 自己标注(不注意坑不少)-Spark+Kafka构建实时分析Dashboard案例——步骤三:Spark Streaming实时处理数据
- 大数据IMF传奇行动绝密课程第117课:Spark Streaming性能优化:如何最大程度的确保Spark Cluster和Kafka连接的稳定性