Spark Streaming源码解读之Job动态生成和深度思考
2016-05-23 23:57
459 查看
JobGenerator和ReceiverTracker的类对象是JobSchedule的类成员。从SparkStreaming应用程序valssc=StreamingContext(conf)入口开始,直到ssc.start()启动了SparkStreaming框架的执行后,一直到JobSchedule调用start(),schedule.start()调用了ReceiverTracker和JobGenerator类对象:
JobScheduler有两个非常重要的成员:
· JobGenerator
· ReceiverTracker
JobScheduler 将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源头数据输入的记录工作委托给ReceiverTracker 。
在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发。每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理。JobGenerator会根据BatchDuration时间间隔,随着时间的推移,会不断的产生作业,驱使checkpoint操作和清理之前DStream的数据。
先看下JobGenerator的start方法,checkpoint的初始化操作,实例化并启动消息循环体EventLoop,开启定时生成Job的定时器:
EvenLoop类中有存储消息的LinkedBlockingDeque类对象和后台线程,后台线程从队列中获取消息,然后调用onReceive方法对该消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法。
processEvent方法是对消息类型进行模式匹配,然后路由到对应处理该消息的方法中。消息的处理一般是发给另外一个线程来处理的,消息循环器不处理耗时的业务逻辑:
GenerateJobs在获取到数据后调用DStreamGraph的generateJobs方法来生成Job:
streamIdToInputInfos是基于时间的数据,获得了这个数据后,jobScheduler.submitJobSet这个方法就产生了jobset,以这个JobSet交给JobSchedule进行调度执行Job。
generateJobs方法中outputStreams是整个DStream中的最后一个DStream。这里outputStream.generateJob(time)类似于RDD中从后往前推:
generateJob方法中jobFunc 封装了context.sparkContext.runJob(rdd, emptyFunc):
Job对象,方法run会导致传入的func被调用:
getOrCompute方法,先根据传入的时间在HashMap中查找下RDD是否存在,如果不存在则调用compute方法计算获取RDD,再根据storageLevel 是否需要persist,是否到了checkpoint时间点进行checkpoint操作,最后把该RDD放入到HashMap中:
再次回到JobGenerator类中,看下start方法中在消息循环体启动后,先判断之前是否进行checkpoint操作,如果是从checkpoint目录中读取然后再调用restart重启JobGenerator,如果是第一次则调用startFirstTime方法:
JobGenerator类中的startFirstTime方法,启动定时生成Job的Timer:
timer对象为RecurringTimer,其start方法内部启动一个线程,在线程中不断调用triggerActionForNextInterval方法:
triggerActionForNextInterval方法,等待BatchDuration后回调callback这个方法,这里的callback方法是构造RecurringTimer对象时传入的方法,即longTime => eventLoop.post(GenerateJobs(new Time(longTime))),不断向消息循环体发送GenerateJobs消息。
再次聚焦generateJobs这个方法生成Job的步骤:
第一步:获取当前时间段内的数据。
第二步:生成Job,RDD之间的依赖关系。
第三步:获取生成Job对应的StreamId的信息。
第四步:封装成JobSet交给JobScheduler。
第五步:进行checkpoint操作。
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中:
JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑:
备注:
DT大数据梦工厂微信公众号: DT_Spark
新浪微博:http://www.weibo.com/ilovepains
王家林老师每晚20:00免费大数据实战YY直播:68917580
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
JobScheduler有两个非常重要的成员:
· JobGenerator
· ReceiverTracker
JobScheduler 将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源头数据输入的记录工作委托给ReceiverTracker 。
在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发。每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理。JobGenerator会根据BatchDuration时间间隔,随着时间的推移,会不断的产生作业,驱使checkpoint操作和清理之前DStream的数据。
先看下JobGenerator的start方法,checkpoint的初始化操作,实例化并启动消息循环体EventLoop,开启定时生成Job的定时器:
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
EvenLoop类中有存储消息的LinkedBlockingDeque类对象和后台线程,后台线程从队列中获取消息,然后调用onReceive方法对该消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法。
processEvent方法是对消息类型进行模式匹配,然后路由到对应处理该消息的方法中。消息的处理一般是发给另外一个线程来处理的,消息循环器不处理耗时的业务逻辑:
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
GenerateJobs在获取到数据后调用DStreamGraph的generateJobs方法来生成Job:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
streamIdToInputInfos是基于时间的数据,获得了这个数据后,jobScheduler.submitJobSet这个方法就产生了jobset,以这个JobSet交给JobSchedule进行调度执行Job。
generateJobs方法中outputStreams是整个DStream中的最后一个DStream。这里outputStream.generateJob(time)类似于RDD中从后往前推:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
generateJob方法中jobFunc 封装了context.sparkContext.runJob(rdd, emptyFunc):
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
Job对象,方法run会导致传入的func被调用:
private[streaming] class Job(val time: Time, func: () => _) { private var _id: String = _ private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = None def run() { _result = Try(func()) }
getOrCompute方法,先根据传入的时间在HashMap中查找下RDD是否存在,如果不存在则调用compute方法计算获取RDD,再根据storageLevel 是否需要persist,是否到了checkpoint时间点进行checkpoint操作,最后把该RDD放入到HashMap中:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
再次回到JobGenerator类中,看下start方法中在消息循环体启动后,先判断之前是否进行checkpoint操作,如果是从checkpoint目录中读取然后再调用restart重启JobGenerator,如果是第一次则调用startFirstTime方法:
JobGenerator类中的startFirstTime方法,启动定时生成Job的Timer:
timer对象为RecurringTimer,其start方法内部启动一个线程,在线程中不断调用triggerActionForNextInterval方法:
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } @volatile private var prevTime = -1L @volatile private var nextTime = -1L @volatile private var stopped = false /** * Get the time when this timer will fire if it is started right now. * The time will be a multiple of this timer's period and more than * current system time. */ def getStartTime(): Long = { (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period } /** * Get the time when the timer will fire if it is restarted right now. * This time depends on when the timer was started the first time, and was stopped * for whatever reason. The time must be a multiple of this timer's period and * more than current time. */ def getRestartTime(originalStartTime: Long): Long = { val gap = clock.getTimeMillis() - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } /** * Start at the given start time. */ def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } /** * Start at the earliest time it can start based on the period. */ def start(): Long = { start(getStartTime()) } /** * Stop the timer, and return the last time the callback was made. * * @param interruptTimer True will interrupt the callback if it is in progress (not guaranteed to * give correct time in this case). False guarantees that there will be at * least one callback after `stop` has been called. */ def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { stopped = true if (interruptTimer) { thread.interrupt() } thread.join() logInfo("Stopped timer for " + name + " after time " + prevTime) } prevTime } private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } }
triggerActionForNextInterval方法,等待BatchDuration后回调callback这个方法,这里的callback方法是构造RecurringTimer对象时传入的方法,即longTime => eventLoop.post(GenerateJobs(new Time(longTime))),不断向消息循环体发送GenerateJobs消息。
再次聚焦generateJobs这个方法生成Job的步骤:
第一步:获取当前时间段内的数据。
第二步:生成Job,RDD之间的依赖关系。
第三步:获取生成Job对应的StreamId的信息。
第四步:封装成JobSet交给JobScheduler。
第五步:进行checkpoint操作。
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中:
JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑:
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } }
备注:
DT大数据梦工厂微信公众号: DT_Spark
新浪微博:http://www.weibo.com/ilovepains
王家林老师每晚20:00免费大数据实战YY直播:68917580
相关文章推荐
- NOJ——1508火烧赤壁2(并查集+启发式合并+逆序加边)
- Java操作Excel文件以及将xls/xlsx转为csv文件
- Java之------集合
- iOS开发用到的Mac系统资源库中各种文件的路径
- 适配器模式(Adapter Pattern)
- Linux环境C程序设计---学习笔记01
- 继承的基本概念(1)
- win7常用cmd命令
- 工具-vscode使用
- 139_Transformer源码分析
- DirectFB的架构介绍
- python操作mysql-通讯录
- ie6兼容性问题
- Hadoop MapReduce执行框架作业调度方法 组件和执行流程
- DirectFB的架构介绍
- css border那点事
- 百度之星初赛A第5题 BD String
- System Verilog Basic(一)
- 138_ViewPager动画
- JAVA垃圾回收