第16课:Spark Streaming源码解读之数据清理内幕彻底解密
2016-05-31 16:52
471 查看
本讲从二个方面阐述:
数据清理原因和现象
数据清理代码解析
Spark Core从技术研究的角度讲对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序。
Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器、广播变量,所以需要对对象及元数据需要定期清理。每个batch duration运行时不断触发job后需要清理rdd和元数据。Clinet模式
可以看到打印的日志,从文件日志也可以看到清理日志内容。
现在要看其背后的事情:
Spark运行在jvm上,jvm会产生对象,jvm需要对对象进行回收工作,如果我们不管理gc(对象产生和回收),jvm很快耗尽。现在研究的是SparkStreaming的Spark GC。Spark Streaming对rdd的数据管理、元数据管理相当jvm对gc管理。
数据、元数据是操作DStream时产生的,数据、元数据的回收则需要研究DStream的产生和回收。
看下DStream的继承结构:
abstract class DStream[T:ClassTag] (
@transient private[streaming]
var ssc:StreamingContext
) extends Serializable
with Logging {
接收数据靠InputDStream,数据输入、数据操作、数据输出,整个生命周期都是基于DStream构建的;得出结论:DStream负责rdd的生命周期,rdd是DStream产生的,对rdd的操作也是对DStream的操作,所以不断产生batchDuration的循环,所以研究对rdd的操作也就是研究对DStream的操作。
物化存储在外部设备上
/**
* Apply a function to each RDD in thisDStream. This is an output operator, so
* 'this' DStream will be registered asan output stream and therefore materialized.
*
* @deprecated As of 0.9.0, replaced by
`foreachRDD`.
*/
@deprecated("useforeachRDD",
"0.9.0")
def foreach(foreachFunc: (RDD[T],Time) => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit ={
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false),displayInnerRDDOps).register()
}
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies:
List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match
{
case Some(rdd) =>
val jobFunc= () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None=> None
}
}
}
/**
* Get the RDD corresponding to the giventime; either retrieve it from cache
* or compute-and-cache it.
*/
//此时put函数中的RDD是最后一个RDD,虽然触发Job是基于时间,但是也是基于DStream的action的。
private[streaming]
final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it fromHashMap,
// or else compute the RDD
//基于时间生成RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time ina sliding window)
// of RDD generation, else generatenothing.
if (isTimeValid(time)){
valrddOption =createRDDWithLocalProperties(time, displayInnerRDDOps =
false) {
// Disable checks for existing output directories in jobslaunched by the streaming
// scheduler, since we may needto write output to an existing directory during checkpoint
// recovery; see SPARK-4835 formore details. We need to have this call here because
// compute() might cause Sparkjobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
//然后对generated RDD进行checkpoint
rddOption.foreach { case newRDD=>
// Register the generated RDD for caching andcheckpointing
if (storageLevel
!= StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time
$time to
$storageLevel")
}
if (checkpointDuration
!= null &&(time -
zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time
$time for checkpointing")
}
//以时间为Key,RDD为Value,此时的RDD为最后一个RDD
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
维护的是时间窗口及时间窗口下的RDD
// RDDs generated,marked as private[streaming] so that testsuites can access it
@transient
private[streaming]
vargeneratedRDDs
= new HashMap[Time,RDD[T]] ()
V, U,
T, R]] = {
val untilOffsets= clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K,
V, U,
T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batchinterval to InputInfoTracker.
val offsetRanges=
currentOffsets.map { case
(tp,fo) =>
val uo =untilOffsets(tp)
OffsetRange(tp.topic,tp.partition, fo, uo.offset)
}
val description= offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition:
${offsetRange.partition}\t"
+
s"offsets: ${offsetRange.fromOffset} to
${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent frombeing modified by the user
val metadata=
Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION
-> description)
val inputInfo= StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
override def compute(thePart: Partition, context:TaskContext):
Iterator[R] = {
val part =thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <=part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset== part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "
+
s"skipping ${part.topic}${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part,context)
}
}
DStream随着时间进行,不断在内存数据结构,generatorRDD中时间窗口和窗口下的rdd实例,
按照batchDuration存储rdd以及删除掉rdd的。有时候会调用DStream的cache操作,cache就是persist操作,其实是对rdd的cache操作。
generatedRDD对象,释放RDD的时候需要考虑三方面:
Rdd本身释放,产生RDD的时候就数据源(数据来源),源数据matadata,释放rdd时三方面都需要考虑。数据周期性产生和周期性释放,需要找到时钟,需要找jobGenerator下的时钟:
//在jobGenerator中,匿名函数会随着时间不断的推移反复被调用。
//这里的timer是RecurringTimer。RecurringTimer的start方法会启动内置线程thread.
private val timer
= new RecurringTimer(clock,
ssc.graph.batchDuration.milliseconds,
//匿名函数,复制给callback。
//根据时间发给eventloop,这边receive的时候不断的有generatorjobs产生:
longTime => eventLoop.post(GenerateJobs(newTime(longTime))),
"JobGenerator")
//匿名内部类
eventLoop = new
EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit =
processEvent(event)
/** Processes allevents */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time)=> generateJobs(time)
case ClearMetadata(time)=>
clearMetadata(time)
case DoCheckpoint(time,clearCheckpointDataLater) =>
doCheckpoint(time,clearCheckpointDataLater)
case ClearCheckpointData(time)=> clearCheckpointData(time)
}
}
/** Clear DStreammetadata for the given
`time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fullyprocessed
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =
true))
} else {
// If checkpointing is not enabled, then delete metadatainformation about
// received blocks (block data notsaved in any case). Otherwise, wait for
// checkpointing of this batch tocomplete.
val maxRememberDuration=
graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
def clearMetadata(time: Time) {
logDebug("Clearing metadata for time "
+ time)
this.synchronized{
outputStreams.foreach(_.clearMetadata(time))
}
logDebug("Cleared old metadata for time "
+ time)
}
/**
* Clear metadata that are older than `rememberDuration`
ofthis DStream.
* This is an internal method that shouldnot be called directly. This default
* implementation clears the oldgenerated RDDs. Subclasses of DStream may override
* this to clear their own metadata alongwith the generated RDDs.
*/
private[streaming]
def clearMetadata(time:Time) {
val unpersistData= ssc.conf.getBoolean("spark.streaming.unpersist",
true)
val oldRDDs= generatedRDDs.filter(_._1 <= (time -
rememberDuration))
logDebug("Clearing references to old RDDs: ["
+
oldRDDs.map(x => s"${x._1} ->
${x._2.id}").mkString(", ") +
"]")
generatedRDDs --= oldRDDs.keys
if (unpersistData){
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b:BlockRDD[_] =>
logInfo("Removing blocks of RDD "
+ b + " of time " + time)
b.removeBlocks()
case _=>
}
}
}
logDebug("Cleared " + oldRDDs.size +
" RDDs that were older than " +
(time - rememberDuration) +
":" +oldRDDs.keys.mkString(","))
dependencies.foreach(_.clearMetadata(time))
}
private def handleJobCompletion(job: Job, completedTime:Long) {
val jobSet= jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id +
" from job set of time " + jobSet.time)
if (jobSet.hasCompleted){
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3fs)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e)=>
reportError("Error running job " + job, e)
case _=>
}
}
数据清理原因和现象
数据清理代码解析
Spark Core从技术研究的角度讲对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序。
Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器、广播变量,所以需要对对象及元数据需要定期清理。每个batch duration运行时不断触发job后需要清理rdd和元数据。Clinet模式
可以看到打印的日志,从文件日志也可以看到清理日志内容。
现在要看其背后的事情:
Spark运行在jvm上,jvm会产生对象,jvm需要对对象进行回收工作,如果我们不管理gc(对象产生和回收),jvm很快耗尽。现在研究的是SparkStreaming的Spark GC。Spark Streaming对rdd的数据管理、元数据管理相当jvm对gc管理。
数据、元数据是操作DStream时产生的,数据、元数据的回收则需要研究DStream的产生和回收。
看下DStream的继承结构:
abstract class DStream[T:ClassTag] (
@transient private[streaming]
var ssc:StreamingContext
) extends Serializable
with Logging {
接收数据靠InputDStream,数据输入、数据操作、数据输出,整个生命周期都是基于DStream构建的;得出结论:DStream负责rdd的生命周期,rdd是DStream产生的,对rdd的操作也是对DStream的操作,所以不断产生batchDuration的循环,所以研究对rdd的操作也就是研究对DStream的操作。
源码分析:
foreachRDD会触发ForEachDStream:
物化存储在外部设备上
/**
* Apply a function to each RDD in thisDStream. This is an output operator, so
* 'this' DStream will be registered asan output stream and therefore materialized.
*
* @deprecated As of 0.9.0, replaced by
`foreachRDD`.
*/
@deprecated("useforeachRDD",
"0.9.0")
def foreach(foreachFunc: (RDD[T],Time) => Unit): Unit = ssc.withScope {
this.foreachRDD(foreachFunc)
}
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit ={
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false),displayInnerRDDOps).register()
}
再看DStream源码foreachRDD:
private[streaming]class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies:
List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match
{
case Some(rdd) =>
val jobFunc= () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None=> None
}
}
}
/**
* Get the RDD corresponding to the giventime; either retrieve it from cache
* or compute-and-cache it.
*/
//此时put函数中的RDD是最后一个RDD,虽然触发Job是基于时间,但是也是基于DStream的action的。
private[streaming]
final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it fromHashMap,
// or else compute the RDD
//基于时间生成RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time ina sliding window)
// of RDD generation, else generatenothing.
if (isTimeValid(time)){
valrddOption =createRDDWithLocalProperties(time, displayInnerRDDOps =
false) {
// Disable checks for existing output directories in jobslaunched by the streaming
// scheduler, since we may needto write output to an existing directory during checkpoint
// recovery; see SPARK-4835 formore details. We need to have this call here because
// compute() might cause Sparkjobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
//然后对generated RDD进行checkpoint
rddOption.foreach { case newRDD=>
// Register the generated RDD for caching andcheckpointing
if (storageLevel
!= StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time
$time to
$storageLevel")
}
if (checkpointDuration
!= null &&(time -
zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time
$time for checkpointing")
}
//以时间为Key,RDD为Value,此时的RDD为最后一个RDD
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
维护的是时间窗口及时间窗口下的RDD
// RDDs generated,marked as private[streaming] so that testsuites can access it
@transient
private[streaming]
vargeneratedRDDs
= new HashMap[Time,RDD[T]] ()
通过对DirectKafkaInputDStream会产生kafkardd:
override def compute(validTime: Time): Option[KafkaRDD[K,V, U,
T, R]] = {
val untilOffsets= clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K,
V, U,
T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batchinterval to InputInfoTracker.
val offsetRanges=
currentOffsets.map { case
(tp,fo) =>
val uo =untilOffsets(tp)
OffsetRange(tp.topic,tp.partition, fo, uo.offset)
}
val description= offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition:
${offsetRange.partition}\t"
+
s"offsets: ${offsetRange.fromOffset} to
${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent frombeing modified by the user
val metadata=
Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION
-> description)
val inputInfo= StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
override def compute(thePart: Partition, context:TaskContext):
Iterator[R] = {
val part =thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <=part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset== part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "
+
s"skipping ${part.topic}${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part,context)
}
}
DStream随着时间进行,不断在内存数据结构,generatorRDD中时间窗口和窗口下的rdd实例,
按照batchDuration存储rdd以及删除掉rdd的。有时候会调用DStream的cache操作,cache就是persist操作,其实是对rdd的cache操作。
generatedRDD对象,释放RDD的时候需要考虑三方面:
Rdd本身释放,产生RDD的时候就数据源(数据来源),源数据matadata,释放rdd时三方面都需要考虑。数据周期性产生和周期性释放,需要找到时钟,需要找jobGenerator下的时钟:
//在jobGenerator中,匿名函数会随着时间不断的推移反复被调用。
//这里的timer是RecurringTimer。RecurringTimer的start方法会启动内置线程thread.
private val timer
= new RecurringTimer(clock,
ssc.graph.batchDuration.milliseconds,
//匿名函数,复制给callback。
//根据时间发给eventloop,这边receive的时候不断的有generatorjobs产生:
longTime => eventLoop.post(GenerateJobs(newTime(longTime))),
"JobGenerator")
//匿名内部类
eventLoop = new
EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit =
processEvent(event)
/** Processes allevents */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time)=> generateJobs(time)
case ClearMetadata(time)=>
clearMetadata(time)
case DoCheckpoint(time,clearCheckpointDataLater) =>
doCheckpoint(time,clearCheckpointDataLater)
case ClearCheckpointData(time)=> clearCheckpointData(time)
}
}
/** Clear DStreammetadata for the given
`time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fullyprocessed
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater =
true))
} else {
// If checkpointing is not enabled, then delete metadatainformation about
// received blocks (block data notsaved in any case). Otherwise, wait for
// checkpointing of this batch tocomplete.
val maxRememberDuration=
graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
def clearMetadata(time: Time) {
logDebug("Clearing metadata for time "
+ time)
this.synchronized{
outputStreams.foreach(_.clearMetadata(time))
}
logDebug("Cleared old metadata for time "
+ time)
}
/**
* Clear metadata that are older than `rememberDuration`
ofthis DStream.
* This is an internal method that shouldnot be called directly. This default
* implementation clears the oldgenerated RDDs. Subclasses of DStream may override
* this to clear their own metadata alongwith the generated RDDs.
*/
private[streaming]
def clearMetadata(time:Time) {
val unpersistData= ssc.conf.getBoolean("spark.streaming.unpersist",
true)
val oldRDDs= generatedRDDs.filter(_._1 <= (time -
rememberDuration))
logDebug("Clearing references to old RDDs: ["
+
oldRDDs.map(x => s"${x._1} ->
${x._2.id}").mkString(", ") +
"]")
generatedRDDs --= oldRDDs.keys
if (unpersistData){
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b:BlockRDD[_] =>
logInfo("Removing blocks of RDD "
+ b + " of time " + time)
b.removeBlocks()
case _=>
}
}
}
logDebug("Cleared " + oldRDDs.size +
" RDDs that were older than " +
(time - rememberDuration) +
":" +oldRDDs.keys.mkString(","))
dependencies.foreach(_.clearMetadata(time))
}
private def handleJobCompletion(job: Job, completedTime:Long) {
val jobSet= jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id +
" from job set of time " + jobSet.time)
if (jobSet.hasCompleted){
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3fs)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e)=>
reportError("Error running job " + job, e)
case _=>
}
}
总结:
Spark Streaming在batchDuration处理完成后都会对产生的信息做清理,对输出DStream清理、依赖关系进行清理、清理默认也会清理rdd数据信息、元数据清理。相关文章推荐
- 在android 中开发java.net.SocketException: socket failed: EACCES (Permission denied) 报错
- 根据ShareSDK实现分享功能组件化
- Web Service
- Middle-题目73:55. Jump Game
- mysql设置指定ip远程访问连接实例
- Tutum公司简介
- Oracle 数据库的对象定义操作(DDL语句)
- 第15课:Spark Streaming源码解读之No Receivers彻底思考
- 关于starrydb的压力测试脚本的使用
- js操作单选框
- Middle-题目72:17. Letter Combinations of a Phone Number
- 计算机网络基础--网络端口分类
- 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- hdu2476(区间dp+普通dp)
- 服务器监控之zabbix监控Tomcat篇
- 大话数据结构:线性表(3)
- 创建表空间
- requestWindowFeature使用详解
- hiho_1078_线段树区间修改
- 高效使用STL