您的位置:首页 > 其它

Spark Streaming源码解读之Job动态生成和深度思考

2016-05-23 23:57 459 查看
JobGenerator和ReceiverTracker的类对象是JobSchedule的类成员。从SparkStreaming应用程序valssc=StreamingContext(conf)入口开始,直到ssc.start()启动了SparkStreaming框架的执行后,一直到JobSchedule调用start(),schedule.start()调用了ReceiverTracker和JobGenerator类对象:

def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}


JobScheduler有两个非常重要的成员:

· JobGenerator

· ReceiverTracker

JobScheduler 将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源头数据输入的记录工作委托给ReceiverTracker 。

在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发。每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理。JobGenerator会根据BatchDuration时间间隔,随着时间的推移,会不断的产生作业,驱使checkpoint操作和清理之前DStream的数据。

先看下JobGenerator的start方法,checkpoint的初始化操作,实例化并启动消息循环体EventLoop,开启定时生成Job的定时器:

/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()

if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}


EvenLoop类中有存储消息的LinkedBlockingDeque类对象和后台线程,后台线程从队列中获取消息,然后调用onReceive方法对该消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法。

processEvent方法是对消息类型进行模式匹配,然后路由到对应处理该消息的方法中。消息的处理一般是发给另外一个线程来处理的,消息循环器不处理耗时的业务逻辑:

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}


GenerateJobs在获取到数据后调用DStreamGraph的generateJobs方法来生成Job:

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}


streamIdToInputInfos是基于时间的数据,获得了这个数据后,jobScheduler.submitJobSet这个方法就产生了jobset,以这个JobSet交给JobSchedule进行调度执行Job。

generateJobs方法中outputStreams是整个DStream中的最后一个DStream。这里outputStream.generateJob(time)类似于RDD中从后往前推:

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}


generateJob方法中jobFunc 封装了context.sparkContext.runJob(rdd, emptyFunc):

/**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}


Job对象,方法run会导致传入的func被调用:

private[streaming]
class Job(val time: Time, func: () => _) {
private var _id: String = _
private var _outputOpId: Int = _
private var isSet = false
  private var _result: Try[_] = null
  private var _callSite: CallSite = null
  private var _startTime: Option[Long] = None
private var _endTime: Option[Long] = None

def run() {
_result = Try(func())
}


getOrCompute方法,先根据传入的时间在HashMap中查找下RDD是否存在,如果不存在则调用compute方法计算获取RDD,再根据storageLevel 是否需要persist,是否到了checkpoint时间点进行checkpoint操作,最后把该RDD放入到HashMap中:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {

val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}

rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}

再次回到JobGenerator类中,看下start方法中在消息循环体启动后,先判断之前是否进行checkpoint操作,如果是从checkpoint目录中读取然后再调用restart重启JobGenerator,如果是第一次则调用startFirstTime方法:

JobGenerator类中的startFirstTime方法,启动定时生成Job的Timer:

timer对象为RecurringTimer,其start方法内部启动一个线程,在线程中不断调用triggerActionForNextInterval方法:

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {

private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
override def run() { loop }
}

@volatile private var prevTime = -1L
@volatile private var nextTime = -1L
@volatile private var stopped = false

/**
* Get the time when this timer will fire if it is started right now.
* The time will be a multiple of this timer's period and more than
* current system time.
*/
def getStartTime(): Long = {
(math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}

/**
* Get the time when the timer will fire if it is restarted right now.
* This time depends on when the timer was started the first time, and was stopped
* for whatever reason. The time must be a multiple of this timer's period and
* more than current time.
*/
def getRestartTime(originalStartTime: Long): Long = {
val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}

/**
* Start at the given start time.
*/
def start(startTime: Long): Long = synchronized {
nextTime = startTime
thread.start()
logInfo("Started timer for " + name + " at time " + nextTime)
nextTime
}

/**
* Start at the earliest time it can start based on the period.
*/
def start(): Long = {
start(getStartTime())
}

/**
* Stop the timer, and return the last time the callback was made.
*
* @param interruptTimer True will interrupt the callback if it is in progress (not guaranteed to
*                       give correct time in this case). False guarantees that there will be at
*                       least one callback after `stop` has been called.
*/
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
if (interruptTimer) {
thread.interrupt()
}
thread.join()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
prevTime
}

private def triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}

/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}
}
}

triggerActionForNextInterval方法,等待BatchDuration后回调callback这个方法,这里的callback方法是构造RecurringTimer对象时传入的方法,即longTime => eventLoop.post(GenerateJobs(new Time(longTime))),不断向消息循环体发送GenerateJobs消息。

再次聚焦generateJobs这个方法生成Job的步骤:

第一步:获取当前时间段内的数据。

第二步:生成Job,RDD之间的依赖关系。

第三步:获取生成Job对应的StreamId的信息。

第四步:封装成JobSet交给JobScheduler。

第五步:进行checkpoint操作。

其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中:

JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑:

private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._

def run() {
try {
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
}
}


备注:

DT大数据梦工厂微信公众号: DT_Spark

新浪微博:http://www.weibo.com/ilovepains

王家林老师每晚20:00免费大数据实战YY直播:68917580
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: