spark streaming 4: DStreamGraph JobScheduler
2015-02-05 14:46
197 查看
DStreamGraph有点像简洁版的DAG scheduler,负责根据某个时间间隔生成一序列JobSet,以及按照依赖关系序列化。这个类的inputStream和outputStream是最重要的属性。spark stream将动态的输入流与对流的处理通过一个shuffle来连接。前面的(shuffle map)是input stream,其实是DStream的子类,它们负责将收集的数据以block的方式存到spark memory中;而output stream,是另外的一系类DStream,负责将数据从spark memory读取出来,分解成spark core中的RDD,然后再做数据处理。
(http://s3.amazonaws.com/ppt-download/deep-dive-with-spark-streamingtathagata-dasspark-meetup2013-06-17-130623151510-phpapp02.pptx?response-content-disposition=attachment&Signature=jcVEZSJefLa7I5%2FytDPScwVJAzE%3D&Expires=1423116551&AWSAccessKeyId=AKIAIA7QTBOH2LDUZRTQ )
JobScheduler负责产生jobs
From WizNote
(http://s3.amazonaws.com/ppt-download/deep-dive-with-spark-streamingtathagata-dasspark-meetup2013-06-17-130623151510-phpapp02.pptx?response-content-disposition=attachment&Signature=jcVEZSJefLa7I5%2FytDPScwVJAzE%3D&Expires=1423116551&AWSAccessKeyId=AKIAIA7QTBOH2LDUZRTQ )
final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null
def addInputStream(inputStream: InputDStream[_]) { this.synchronized { inputStream.setGraph(this) inputStreams += inputStream } } def addOutputStream(outputStream: DStream[_]) { this.synchronized { outputStream.setGraph(this) outputStreams += outputStream } } def getInputStreams() = this.synchronized { inputStreams.toArray } def getOutputStreams() = this.synchronized { outputStreams.toArray } def getReceiverInputStreams() = this.synchronized { inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]]) .map(_.asInstanceOf[ReceiverInputDStream[_]]) .toArray }
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
@throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true logDebug("Enabled checkpoint mode") oos.defaultWriteObject() checkpointInProgress = false logDebug("Disabled checkpoint mode") } } @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.readObject used") this.synchronized { checkpointInProgress = true ois.defaultReadObject() checkpointInProgress = false } }
JobScheduler负责产生jobs
/** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging {
private val jobSets = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventActor not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null private var eventActor: ActorRef = null
def start(): Unit = synchronized { if (eventActor != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { def receive = { case event: JobSchedulerEvent => processEvent(event) } }), "JobScheduler") listenerBus.start() receiverTracker = new ReceiverTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
private class JobHandler(job: Job) extends Runnable { def run() { eventActor ! JobStarted(job) job.run() eventActor ! JobCompleted(job) } }job完成后处理
private def handleJobCompletion(job: Job) { job.result match { case Success(_) => val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } case Failure(e) => reportError("Error running job " + job, e) } }
From WizNote
相关文章推荐
- Spark发行笔记7 SparkStreaming JobScheduler
- SparkStream例子HdfsWordCount--Streaming的Job是如何调度的
- [spark streaming] DStream 和 DStreamGraph 解析
- Spark定制班第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错
- Kafka+ Spark Streaming 创建stream编译报错
- 第6课:Spark Streaming源码解读之Job动态生成和深度思考
- Spark Streaming之二:DStream解析
- (版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- spark streaming 实现kafka的createDirectStream方式!!不坑
- 通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错
- 第6课:Spark Streaming源码解读之Job动态生成和深度思考
- Spark Streaming源码解读之Job动态生成和深度思考
- spark streaming 实现kafka的createDirectStream方式!!不坑
- Spark Streaming job 远程debug方法
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- 解密SparkStreaming运行机制和架构进阶之Job和容错
- SparkStreaming+Flume出现ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException
- Spark学习笔记(3)SparkStreaming架构进阶之Job和容错
- spark streaming 实现kafka的createDirectStream方式!!不坑