JobScheduler内幕实现和深度思考(第七篇)
2016-05-22 15:44
393 查看
接着上一节我们看到了JobGenerator.generateJobs()方法:
jobScheduler.submitJobSet方法:
由上我们可以看到:里会为每个job生成一个新的JobHandler,交给jobExecutor运行。jobScheduler会为每个Job生成一个JobHandler并交由jobExecutor以线程的方式去执行
而jobExecutor 是这样初始化的:
由上我们可以看到通过设置参数:spark.streaming.concurrentJobs,可以动态的增加jobExecutor的并行度,尤其是当我们有多个输出的时候,这也是一个可以调优的地方
我们接着看JobHandler中提交Job的主要逻辑:
由上可以看到JobHandler除了做一些状态记录外,最主要的就是调用job.run()
我们在job.run()可以看到run方法中执行func()函数,而func()函数是Job初始化时转入进来的
而Job的产生是在JobGenerator的generateJobs方法中产生,而JobGenerator.generateJobs方法中又调用了DStreamGraph.generateJobs方法:
DStreamGraph.generateJobs:
这时候我们可以发现这块实际调用的是:outputStream.generateJob(time),而outputStream本身就是一个DStream,而这个DStream从何而来呢?
下面我们有DStream的print方法进一步跟踪代码:
这时候我们发现print方法中封装了一个foreachFunc函数,我们接着看foreachRDD方法:
这时候我
4000
们发现有初始化了ForEachDStream,我们来看其主要方法,这这时候你可以会有似曾相识的感觉了
这时候我们看到这儿也有一个generateJob方法,而且DStream中的print方法的函数foreachFunc最终传入到了ForEachDStream的generateJob中,这时候我们再联想DStreamGraph中执行的generateJob方法,这时候我们发现DStreamGraph方法实际上是执行了ForEachDStream.generateJob(DStream所有子类当中只有ForEachDStream实现了generateJob方法),而上文看到的job.run()中的func()函数其本身中调用的是此处的foreachFunc方法(业务逻辑方法)
我们这块借用一位兄弟的一张图来阐明整个过程(该图出自:http://lqding.blog.51cto.com/9123978/1773391):
private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
jobScheduler.submitJobSet方法:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
由上我们可以看到:里会为每个job生成一个新的JobHandler,交给jobExecutor运行。jobScheduler会为每个Job生成一个JobHandler并交由jobExecutor以线程的方式去执行
而jobExecutor 是这样初始化的:
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
由上我们可以看到通过设置参数:spark.streaming.concurrentJobs,可以动态的增加jobExecutor的并行度,尤其是当我们有多个输出的时候,这也是一个可以调优的地方
我们接着看JobHandler中提交Job的主要逻辑:
var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) //上面这句主要用于记录Job的启动时间,并以事件方式推送eventLoop的队列中 // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() //真正的Job执行 } _eventLoop = eventLoop if (_eventLoop != null) { //上面这句主要用于记录Job的完成时间,并以事件方式推送eventLoop的队列中 _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) }
由上可以看到JobHandler除了做一些状态记录外,最主要的就是调用job.run()
我们在job.run()可以看到run方法中执行func()函数,而func()函数是Job初始化时转入进来的
def run() { _result = Try(func()) }
而Job的产生是在JobGenerator的generateJobs方法中产生,而JobGenerator.generateJobs方法中又调用了DStreamGraph.generateJobs方法:
DStreamGraph.generateJobs:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
这时候我们可以发现这块实际调用的是:outputStream.generateJob(time),而outputStream本身就是一个DStream,而这个DStream从何而来呢?
下面我们有DStream的print方法进一步跟踪代码:
def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
这时候我们发现print方法中封装了一个foreachFunc函数,我们接着看foreachRDD方法:
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
这时候我
4000
们发现有初始化了ForEachDStream,我们来看其主要方法,这这时候你可以会有似曾相识的感觉了
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
这时候我们看到这儿也有一个generateJob方法,而且DStream中的print方法的函数foreachFunc最终传入到了ForEachDStream的generateJob中,这时候我们再联想DStreamGraph中执行的generateJob方法,这时候我们发现DStreamGraph方法实际上是执行了ForEachDStream.generateJob(DStream所有子类当中只有ForEachDStream实现了generateJob方法),而上文看到的job.run()中的func()函数其本身中调用的是此处的foreachFunc方法(业务逻辑方法)
我们这块借用一位兄弟的一张图来阐明整个过程(该图出自:http://lqding.blog.51cto.com/9123978/1773391):
相关文章推荐
- 第13课:Spark Streaming 源码解读之Driver 容错安全性
- 一个帮助编辑,测试Cron表达式的网站
- 样本归一化
- shell常用命令
- 图片验证码技术
- jQuery load()方法的封装
- Java HashMap的工作原理
- C语言实现直接法解线性方程组
- 大数的四则运算与比较
- docker进入容器的方式
- 努力安放这颗好强而骄傲的心
- 一条SQL的改写
- R语言 关联规则(二)
- 百度地图的室内定位的原理是什么?
- sql server 得到数据库字典
- mysql批量导入,导出
- jQuery的load()方法
- Eclipse上GitHub的安装和上传过程详细示例
- 软件发布中关于开源License的选择问题
- insert();的三种使用