您的位置:首页 > 其它

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

2016-05-31 16:52 471 查看
本讲从二个方面阐述:

数据清理原因和现象

数据清理代码解析

 

Spark Core从技术研究的角度讲对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序。

Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器、广播变量,所以需要对对象及元数据需要定期清理。每个batch duration运行时不断触发job后需要清理rdd和元数据。Clinet模式

可以看到打印的日志,从文件日志也可以看到清理日志内容。

现在要看其背后的事情:

Spark运行在jvm上,jvm会产生对象,jvm需要对对象进行回收工作,如果我们不管理gc(对象产生和回收),jvm很快耗尽。现在研究的是SparkStreaming的Spark GC。Spark Streaming对rdd的数据管理、元数据管理相当jvm对gc管理。

数据、元数据是操作DStream时产生的,数据、元数据的回收则需要研究DStream的产生和回收。

 

看下DStream的继承结构:

abstract class DStream[T:ClassTag] (

    @transient private[streaming]
var ssc:StreamingContext

  ) extends Serializable
with
Logging {

接收数据靠InputDStream,数据输入、数据操作、数据输出,整个生命周期都是基于DStream构建的;得出结论:DStream负责rdd的生命周期,rdd是DStream产生的,对rdd的操作也是对DStream的操作,所以不断产生batchDuration的循环,所以研究对rdd的操作也就是研究对DStream的操作。

 

源码分析:

foreachRDD会触发ForEachDStream:

 

物化存储在外部设备上

/**

 * Apply a function to each RDD in thisDStream. This is an output operator, so

 * 'this' DStream will be registered asan output stream and therefore materialized.

 *

 * @deprecated As of 0.9.0, replaced by
`foreachRDD`.

 */
@deprecated("useforeachRDD",
"0.9.0")
def foreach(foreachFunc: (RDD[T],Time) => Unit): Unit = ssc.withScope {

  this.foreachRDD(foreachFunc)

}
private def foreachRDD(

    foreachFunc: (RDD[T], Time) => Unit,

    displayInnerRDDOps: Boolean): Unit ={

  new ForEachDStream(this,

   context.sparkContext.clean(foreachFunc, false),displayInnerRDDOps).register()

}

再看DStream源码foreachRDD:

private[streaming]
class ForEachDStream[T: ClassTag] (

    parent: DStream[T],

    foreachFunc: (RDD[T], Time) => Unit,

    displayInnerRDDOps: Boolean

  ) extends DStream[Unit](parent.ssc) {

  override def dependencies:
List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {

    parent.getOrCompute(time) match
{

      case Some(rdd) =>

        val jobFunc= () => createRDDWithLocalProperties(time, displayInnerRDDOps) {

          foreachFunc(rdd, time)

        }

        Some(new Job(time, jobFunc))

      case None=> None

    }

  }

}
/**

 * Get the RDD corresponding to the giventime; either retrieve it from cache

 * or compute-and-cache it.

 */
//此时put函数中的RDD是最后一个RDD,虽然触发Job是基于时间,但是也是基于DStream的action的。
private[streaming]
final def
getOrCompute(time: Time): Option[RDD[T]] = {

  // If RDD was already generated, then retrieve it fromHashMap,

  // or else compute the RDD

  //基于时间生成RDD

  generatedRDDs.get(time).orElse {

    // Compute the RDD if time is valid (e.g. correct time ina sliding window)

    // of RDD generation, else generatenothing.

    if (isTimeValid(time)){

      valrddOption =createRDDWithLocalProperties(time, displayInnerRDDOps =
false) {

        // Disable checks for existing output directories in jobslaunched by the streaming

        // scheduler, since we may needto write output to an existing directory during checkpoint

        // recovery; see SPARK-4835 formore details. We need to have this call here because

        // compute() might cause Sparkjobs to be launched.

        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

          compute(time)

        }

      }

      //然后对generated RDD进行checkpoint

      rddOption.foreach { case newRDD=>

        // Register the generated RDD for caching andcheckpointing

        if (storageLevel
!= StorageLevel.NONE) {

          newRDD.persist(storageLevel)

          logDebug(s"Persisting RDD ${newRDD.id} for time
$
time to
$
storageLevel")

        }

        if (checkpointDuration
!= null &&(time -
zeroTime).isMultipleOf(checkpointDuration)) {

          newRDD.checkpoint()

          logInfo(s"Marking RDD ${newRDD.id} for time
$
time for checkpointing")

        }

        //以时间为Key,RDD为Value,此时的RDD为最后一个RDD

        generatedRDDs.put(time, newRDD)

      }

      rddOption

    } else {

      None

    }

  }

}
 

维护的是时间窗口及时间窗口下的RDD

// RDDs generated,marked as private[streaming] so that testsuites can access it
@transient
private[streaming]
var
generatedRDDs
= new HashMap[Time,RDD[T]] ()
 

通过对DirectKafkaInputDStream会产生kafkardd:

override def compute(validTime: Time): Option[KafkaRDD[K,
V, U,
T, R]] = {

  val untilOffsets= clamp(latestLeaderOffsets(maxRetries))

  val rdd = KafkaRDD[K,
V, U,
T, R](

    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number and metadata of this batchinterval to InputInfoTracker.

  val offsetRanges=
currentOffsets.map { case
(tp,fo) =>

    val uo =untilOffsets(tp)

    OffsetRange(tp.topic,tp.partition, fo, uo.offset)

  }

  val description= offsetRanges.filter { offsetRange =>

    // Don't display empty ranges.

    offsetRange.fromOffset != offsetRange.untilOffset

  }.map { offsetRange =>

    s"topic: ${offsetRange.topic}\tpartition:
$
{offsetRange.partition}\t"
+

      s"offsets: ${offsetRange.fromOffset} to
$
{offsetRange.untilOffset}"

 
}.mkString("\n")

  // Copy offsetRanges to immutable.List to prevent frombeing modified by the user

  val metadata=
Map(

    "offsets" -> offsetRanges.toList,

    StreamInputInfo.METADATA_KEY_DESCRIPTION
-> description)

  val inputInfo= StreamInputInfo(id, rdd.count, metadata)

  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

  Some(rdd)

}
override def compute(thePart: Partition, context:TaskContext):
Iterator[R] = {

  val part =thePart.asInstanceOf[KafkaRDDPartition]

  assert(part.fromOffset <=part.untilOffset, errBeginAfterEnd(part))

  if (part.fromOffset== part.untilOffset) {

    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "
+

      s"skipping ${part.topic}${part.partition}")

    Iterator.empty

  } else {

    new KafkaRDDIterator(part,context)

  }

}
DStream随着时间进行,不断在内存数据结构,generatorRDD中时间窗口和窗口下的rdd实例,
按照batchDuration存储rdd以及删除掉rdd的。有时候会调用DStream的cache操作,cache就是persist操作,其实是对rdd的cache操作。
generatedRDD对象,释放RDD的时候需要考虑三方面:

Rdd本身释放,产生RDD的时候就数据源(数据来源),源数据matadata,释放rdd时三方面都需要考虑。数据周期性产生和周期性释放,需要找到时钟,需要找jobGenerator下的时钟:
//在jobGenerator中,匿名函数会随着时间不断的推移反复被调用。

//这里的timer是RecurringTimer。RecurringTimer的start方法会启动内置线程thread.
private val timer
= new RecurringTimer(clock,
ssc.graph.batchDuration.milliseconds,

  //匿名函数,复制给callback。
//根据时间发给eventloop,这边receive的时候不断的有generatorjobs产生:

  longTime => eventLoop.post(GenerateJobs(newTime(longTime))),
"JobGenerator")
//匿名内部类
eventLoop = new
EventLoop[JobGeneratorEvent]("JobGenerator") {

  override protected def onReceive(event: JobGeneratorEvent): Unit =
processEvent(event)
/** Processes allevents */
private def processEvent(event: JobGeneratorEvent) {

  logDebug("Got event " + event)

  event match {

    case GenerateJobs(time)=> generateJobs(time)

    case ClearMetadata(time)=>
clearMetadata(time)

    case DoCheckpoint(time,clearCheckpointDataLater) =>

      doCheckpoint(time,clearCheckpointDataLater)

    case ClearCheckpointData(time)=> clearCheckpointData(time)

  }

}
/** Clear DStreammetadata for the given
`time`. */
private def clearMetadata(time: Time) {

  ssc.graph.clearMetadata(time)

  // If checkpointing is enabled, then checkpoint,

  // else mark batch to be fullyprocessed

  if (shouldCheckpoint) {

    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =
true))

  } else {

    // If checkpointing is not enabled, then delete metadatainformation about

    // received blocks (block data notsaved in any case). Otherwise, wait for

    // checkpointing of this batch tocomplete.

    val maxRememberDuration=
graph.getMaxInputStreamRememberDuration()

    jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)

    jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

    markBatchFullyProcessed(time)

  }

}
def clearMetadata(time: Time) {

  logDebug("Clearing metadata for time "
+ time)

  this.synchronized{

    outputStreams.foreach(_.clearMetadata(time))

  }

  logDebug("Cleared old metadata for time "
+ time)

}
/**

 * Clear metadata that are older than `rememberDuration`
ofthis DStream.

 * This is an internal method that shouldnot be called directly. This default

 * implementation clears the oldgenerated RDDs. Subclasses of DStream may override

 * this to clear their own metadata alongwith the generated RDDs.

 */
private[streaming]
def
clearMetadata(time:Time) {

  val unpersistData= ssc.conf.getBoolean("spark.streaming.unpersist",
true)

  val oldRDDs= generatedRDDs.filter(_._1 <= (time -
rememberDuration))

  logDebug("Clearing references to old RDDs: ["
+

    oldRDDs.map(x => s"${x._1} ->
$
{x._2.id}").mkString(", ") +
"]")

  generatedRDDs --= oldRDDs.keys

  if (unpersistData){

    logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))

    oldRDDs.values.foreach { rdd =>

      rdd.unpersist(false)

      // Explicitly remove blocks of BlockRDD

      rdd match {

        case b:BlockRDD[_] =>

          logInfo("Removing blocks of RDD "
+ b + " of time " + time)

          b.removeBlocks()

        case _=>

      }

    }

  }

  logDebug("Cleared " + oldRDDs.size +
" RDDs that were older than " +

    (time - rememberDuration) +
":"
+oldRDDs.keys.mkString(","))

 dependencies.foreach(_.clearMetadata(time))

}
 

private def handleJobCompletion(job: Job, completedTime:Long) {

  val jobSet= jobSets.get(job.time)

  jobSet.handleJobCompletion(job)

  job.setEndTime(completedTime)

  listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))

  logInfo("Finished job " + job.id +
" from job set of time " + jobSet.time)

  if (jobSet.hasCompleted){

    jobSets.remove(jobSet.time)

    jobGenerator.onBatchCompletion(jobSet.time)

    logInfo("Total delay: %.3f s for time %s (execution: %.3fs)".format(

      jobSet.totalDelay / 1000.0, jobSet.time.toString,

      jobSet.processingDelay / 1000.0

    ))

   listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))

  }

  job.result match {

    case Failure(e)=>

      reportError("Error running job " + job, e)

    case _=>

  }

}
 

总结:

Spark Streaming在batchDuration处理完成后都会对产生的信息做清理,对输出DStream清理、依赖关系进行清理、清理默认也会清理rdd数据信息、元数据清理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: