您的位置:首页 > 其它

JobScheduler内幕实现和深度思考(第七篇)

2016-05-22 15:44 393 查看
接着上一节我们看到了JobGenerator.generateJobs()方法:

private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}


jobScheduler.submitJobSet方法:

def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}


由上我们可以看到:里会为每个job生成一个新的JobHandler,交给jobExecutor运行。jobScheduler会为每个Job生成一个JobHandler并交由jobExecutor以线程的方式去执行

而jobExecutor 是这样初始化的:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")


由上我们可以看到通过设置参数:spark.streaming.concurrentJobs,可以动态的增加jobExecutor的并行度,尤其是当我们有多个输出的时候,这也是一个可以调优的地方

我们接着看JobHandler中提交Job的主要逻辑:

var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
//上面这句主要用于记录Job的启动时间,并以事件方式推送eventLoop的队列中
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()   //真正的Job执行
}
_eventLoop = eventLoop
if (_eventLoop != null) {
//上面这句主要用于记录Job的完成时间,并以事件方式推送eventLoop的队列中
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}


由上可以看到JobHandler除了做一些状态记录外,最主要的就是调用job.run()

我们在job.run()可以看到run方法中执行func()函数,而func()函数是Job初始化时转入进来的

def run() {
_result = Try(func())
}


而Job的产生是在JobGenerator的generateJobs方法中产生,而JobGenerator.generateJobs方法中又调用了DStreamGraph.generateJobs方法:

DStreamGraph.generateJobs:

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}


这时候我们可以发现这块实际调用的是:outputStream.generateJob(time),而outputStream本身就是一个DStream,而这个DStream从何而来呢?

下面我们有DStream的print方法进一步跟踪代码:

def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}


这时候我们发现print方法中封装了一个foreachFunc函数,我们接着看foreachRDD方法:

private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}


这时候我
4000
们发现有初始化了ForEachDStream,我们来看其主要方法,这这时候你可以会有似曾相识的感觉了

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}


这时候我们看到这儿也有一个generateJob方法,而且DStream中的print方法的函数foreachFunc最终传入到了ForEachDStream的generateJob中,这时候我们再联想DStreamGraph中执行的generateJob方法,这时候我们发现DStreamGraph方法实际上是执行了ForEachDStream.generateJob(DStream所有子类当中只有ForEachDStream实现了generateJob方法),而上文看到的job.run()中的func()函数其本身中调用的是此处的foreachFunc方法(业务逻辑方法)

我们这块借用一位兄弟的一张图来阐明整个过程(该图出自:http://lqding.blog.51cto.com/9123978/1773391):

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: