您的位置:首页 > 其它

spark-streaming系列------- 1. spark-streaming的Job调度 上

2015-09-10 16:12 621 查看
这段时间分析了下spark-streming任务调度的源码,成果分享一下。

类似于spark-core,spark-streaming有自己的一套任务调度,具体代码在spark-streaming的scheduler包里面。

我以Kafka DirectDStream数据接收为例,画了一张整体的流程图:



在将这张图之前,需要先明确spark-streaming中3个重要的类:

JobScheduler类,这个类负责spark-streaming的Job调度

JobGenerator类, 这个类负责spark-streaming的Job生成

DStreamGraph类, 这个类负责spark-streaming的Job的通用描述,JobGenerator类根据DStreamGraph类的描述产生Job,比如说设置Job的SparkContext、设置每个outputStream的初始化

Driver程序在创建StreamContext对象的时候会创建JobScheduler对象和DStreamGraph对象,执行StreamContext.start之后,spark-streaming进入了它的任务调度,

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        sparkContext.setCallSite(startSite.get)
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()
            <span style="color:#ff0000;"><strong>scheduler.start()//进入spark-streaming的Job调度
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }


JobScheduler的start方法里面,会启动任务任务调度的事件侦听循环,侦听Job开始事件、Job完成事件、 Job出错事件3类事件。在spark-streaming DirectDStream数据接收方式下,ReceiverTracker几乎没有什么作用。这个方法里面最终要的是启动JobGenerator.start()方法的调用。这个方法的调用进入了Spark-streaming的Job的产生

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)//在DirectDStream接收方式下,几乎没有什么作用,因为DirectStream已经不使用receiver了
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()//进入spark-streaming Job的产生
    logInfo("Started JobScheduler")
  }


JobGererator.start()方法是Job产生的入口。在这个方法做了两件事情。

1. 启动侦听Job产生事件死循环

2. 第一次产生Job

def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
   eventLoop.start()//开始侦听spark-streaming产生Job事件
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()//第一次启动Job
    }
  }


在创建JobGenerator对象的时候,会创建定时产生Streaming Job的定时器,代码如下:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,//定时产生Job的时间周期是设置的streaming batch时间
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")


JobGenerator侦听的事件为:产生Job的事件 清除Streaming元数据事件 checkpoint事件 清除checkpoint事件

对比JobScheduler的事件,可以得出如下结论:

[b]JobScheduler的事件,处理Job的调度; JobGenerator的事件,更倾向于处理Job内部的一些事情[/b]

JobGenerator.startFirstTime方法启动第一次Job的生成和启动定时产生Job的定时器

/** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)//启动DStreamGraph的初始化
    timer.start(startTime.milliseconds)  //启动定时产生Streaming Job的定时器
    logInfo("Started JobGenerator at " + startTime)
  }


DStreamGraph.start()方法为Streaming开始接收数据做准备,并且在非DirectDStream数据接收情况下,开始接收数据。DirectDStream接收数据的开始位置会在接下来的系列中写到。代码如下:

def start(time: Time) {
    this.synchronized {
      if (zeroTime != null) {
        throw new Exception("DStream graph computation already started")
      }
      zeroTime = time
      startTime = time
      outputStreams.foreach(_.initialize(zeroTime))
      outputStreams.foreach(_.remember(rememberDuration))
      outputStreams.foreach(_.validateAtStart)
      inputStreams.par.foreach(_.start())//非DirectDStream数据接收</strong></span>情况下,开始接收数据
    }
  }


DStreamGraph.inputStreams的作用:数据流的输入数据结构,Spark-streaming通过inputStream接收数据,比如说DirectKafkaDStream,inputStreams是在创建input stream的时候创建。

DStreamGraph.outputStreams的作用:当数据需要输出的时候,会将inputStream添加到outputStreams,比如说调用DStream.foreachRDD输出数据的时候

spark-streaming Job的具体生成在JobGenerator.generateJobs方法。

关于spark-streaming Job如何具体生成,参见接下来的 spark-streaming的Job调度 下
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: