spark core 2.0 CheckpointState RDDCheckpointData Checkpoint LocalRDDCheckpointData
2017-01-23 10:32
447 查看
/**
* Enumeration to manage state transitions of an RDD through checkpointing
* [ Initialized --> checkpointing in progress --> checkpointed ].
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
}
/**
* An implementation of checkpointing implemented on top of Spark's caching layer.
*
* Local checkpointing trades off fault tolerance for performance by skipping the expensive
* step of saving the RDD data to a reliable and fault-tolerant storage. Instead, the data
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
* Ensure the RDD is fully cached so the partitions can be recovered later.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val level = rdd.getStorageLevel
// Assume storage level uses disk; otherwise memory eviction may cause data loss
assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")
// Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
// must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
!SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
}
if (missingPartitionIndices.nonEmpty) {
rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
}
new LocalCheckpointRDD[T](rdd)
}
}
* Enumeration to manage state transitions of an RDD through checkpointing
* [ Initialized --> checkpointing in progress --> checkpointed ].
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
}
/** * This class contains all the information related to RDD checkpointing. Each instance of this * class is associated with a RDD. It manages process of checkpointing of the associated RDD, * as well as, manages the post-checkpoint state by providing the updated partitions, * iterator and preferred locations of the checkpointed RDD. */ private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T]) extends Serializable { import CheckpointState._ // The checkpoint state of the associated RDD. protected var cpState = Initialized // The RDD that contains our checkpointed data private var cpRDD: Option[CheckpointRDD[T]] = None // TODO: are we sure we need to use a global lock in the following methods? /** * Return whether the checkpoint data for this RDD is already persisted. */ def isCheckpointed: Boolean = RDDCheckpointData.synchronized { cpState == Checkpointed } /** * Materialize this RDD and persist its content. * This is called immediately after the first action invoked on this RDD has completed. */ final def checkpoint(): Unit = { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the state of this RDDCheckpointData RDDCheckpointData.synchronized { if (cpState == Initialized) { cpState = CheckpointingInProgress } else { return } } val newRDD = doCheckpoint() // Update our state and truncate the RDD lineage RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } } /** * Materialize this RDD and persist its content. * * Subclasses should override this method to define custom checkpointing behavior. * @return the checkpoint RDD created in the process. */ protected def doCheckpoint(): CheckpointRDD[T] /** * Return the RDD that contains our checkpointed data. * This is only defined if the checkpoint state is `Checkpointed`. */ def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.synchronized { cpRDD } /** * Return the partitions of the resulting checkpoint RDD. * For tests only. */ def getPartitions: Array[Partition] = RDDCheckpointData.synchronized { cpRDD.map(_.partitions).getOrElse { Array.empty } } }
/** * Global lock for synchronizing checkpoint operations. */ private[spark] object RDDCheckpointData
/**
* An implementation of checkpointing implemented on top of Spark's caching layer.
*
* Local checkpointing trades off fault tolerance for performance by skipping the expensive
* step of saving the RDD data to a reliable and fault-tolerant storage. Instead, the data
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
* Ensure the RDD is fully cached so the partitions can be recovered later.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val level = rdd.getStorageLevel
// Assume storage level uses disk; otherwise memory eviction may cause data loss
assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")
// Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
// must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
!SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
}
if (missingPartitionIndices.nonEmpty) {
rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
}
new LocalCheckpointRDD[T](rdd)
}
}
private[spark] object LocalRDDCheckpointData { val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK /** * Transform the specified storage level to one that uses disk. * * This guarantees that the RDD can be recomputed multiple times correctly as long as * executors do not fail. Otherwise, if the RDD is cached in memory only, for instance, * the checkpoint data will be lost if the relevant block is evicted from memory. * * This method is idempotent. */ def transformStorageLevel(level: StorageLevel): StorageLevel = { StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication) } }
相关文章推荐
- saprk core 2.0 Partition CheckpointRDDPartition
- spark 2.1 RDDCheckpointData and ReliableRDDCheckpointData
- Spark: Best practice for retrieving big data from RDD to local machine
- D31 Spark的checkpoint/sql/dataframe
- spark rdd checkpoint的用法注意点
- spark rdd checkpoint的用法注意点
- 走进spark(一) rdd.checkpoint
- spark core组件:RDD、DataFrame和DataSet
- OGG的Replicat Checkpoint RBA和Local Trail Size
- Spark Core组件:RDD、DataFrame和DataSet
- spark2.0系列《一》—— RDD VS. DataFrame VS. DataSet
- 微信分享图片checkArgs fail, thumbData is invalid的错误
- 关于ControlFlowPointcut与IntroductionInterceptor 【spring 2.0 技术手册补遗】
- PCD(Point Cloud Data)文件格式
- spark 中 rdd to dataframe 问题
- 在Asp.Net 2.0中应用DataFormatString
- Using the ASP.NET 2.0 ReportViewer in Local Mode
- Spark Core源码分析: RDD基础
- TouchPointPressed without previous release event QQuickEventPoint(valid:true accepted:false state:Pr