sparkstreaming直连kafka源码分析(基于spark1.6)
2017-06-24 10:12
459 查看
valdata = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, TopicsSet)会创建一个DirectKafkaInputDStream并加入到ssc自身维护的流程图变量
ssc.graph.addInputStream(this)
然后进行data.foreachRdd操作的时候会形成一个outputstream:
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }其中将自身(data)传入作为
ForEachDStream的parent,这样就保存了每个stream的血统。
ssc.start()方法会调用JobScheduler的start()方法:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
其中比较重要的是:
启动了一个线程消费任务事件:
private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }另外的是:
receiverTracker.start() jobGenerator.start()
receiverTracker.start()是专门为有receiver的inputstream设计的,其start代码如下:
defstart(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }因为直连的方式创建的DirectKafkaInputDStream父类为InputStream,不是ReceiverInputDStream,所以
当采用直连的方式的时候 receiverTracker.start() 其实啥也没做。
那么关键就是
jobGenerator.start()
源码如下:
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }可以看到,他会启动一个消费事件的线程,然后判断是否首次启动,如果是,那么:
/** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
就会启动定时器,定时执行callBack函数,那么这个函数是怎么定义的呢?
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
可以看到会定时发送
GenerateJobs(new Time(longTime)))给eventLoop,然后看eventLoop的run方法:
override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } }onReceive方法会调用:
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //此处可以看到每次rdd形成后就会checkpoint操作。 }其中关键的是:
graph.generateJobs(time) ,点进去:
defgenerateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }再点进outputStream.generateJob(time):
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
这里已经很清楚了,outputstreams(所有的action操作都会形成各自的outputstream,并保存在ssc的graph变量中)
调用outputstream的compute方法形成rdd,而outputstream的compute又依赖于它的parent的compute,所以最终会调用
DirectKafkaInputDStream的compute方法
那么该方法又是如何运行成rdd的呢?
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker. val offsetRanges = currentOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }其中第一行代码
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))用于计算每个rdd的截止offset,具体就是先获取最新的offset 和设置的参数 和当前的offset,然后得出
可以看出offset是维护在driver的内存的
然后形成rdd,封装成job以后,回到上面的代码 发现后面是调用:
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))接着点进去:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
发现是把job封装成JobHandler交给线程池运行,点击JobHandler的run方法:
def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } }
发现是给JobScheduler维护的队列发送启动事件,,然后执行job.run(),最后再给队列发送一个job结束事件
那么job.run()方法里面执行的就是job封装的函数,回到前面查看封装的函数怎么定义:
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }其实就是这一时间段的rdd加上相应的执行函数。
相关文章推荐
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算 + Spark 基于pyspark下的实时日志分析
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- Spark集成Kafka源码分析——SparkStreaming从kafak中接收数据
- spark-streaming-kafka包源码分析
- spark-streaming-kafka-0-10源码分析
- 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本
- (版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码
- Java Collections Framework之Stack源码分析缺陷,栈改进版(通过LinkedList实现)(基于JDK1.6)
- ArrayList源码分析(基于JDK1.6)
- Spark Streaming源码分析 – InputDStream
- Java Collections Framework之Deque(LinkedList实现)源码分析(基于JDK1.6)(已补充)
- LinkedHashMap源码分析(基于JDK1.6)
- ArrayList源码分析(基于JDK1.6)
- Java Collections Framework之Collections源码分析(基于JDK1.6)
- Spark Streaming源码分析 – JobScheduler
- Java Collections Framework之LinkedHashMap源码分析(基于JDK1.6)(??)
- Java Collections Framework之HashMap源码分析(基于JDK1.6)
- HashSet及LinkedHashSet源码分析(基于JDK1.6)
- HashMap源码分析(基于JDK1.6)