您的位置:首页 > 其它

spark core 2.0 CheckpointState RDDCheckpointData Checkpoint LocalRDDCheckpointData

2017-01-23 10:32 447 查看
/**
* Enumeration to manage state transitions of an RDD through checkpointing
* [ Initialized --> checkpointing in progress --> checkpointed ].
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
}
/**
* This class contains all the information related to RDD checkpointing. Each instance of this
* class is associated with a RDD. It manages process of checkpointing of the associated RDD,
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {

import CheckpointState._

// The checkpoint state of the associated RDD.
protected var cpState = Initialized

// The RDD that contains our checkpointed data
private var cpRDD: Option[CheckpointRDD[T]] = None

// TODO: are we sure we need to use a global lock in the following methods?

/**
* Return whether the checkpoint data for this RDD is already persisted.
*/
def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
cpState == Checkpointed
}

/**
* Materialize this RDD and persist its content.
* This is called immediately after the first action invoked on this RDD has completed.
*/
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}

val newRDD = doCheckpoint()

// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}

/**
* Materialize this RDD and persist its content.
*
* Subclasses should override this method to define custom checkpointing behavior.
* @return the checkpoint RDD created in the process.
*/
protected def doCheckpoint(): CheckpointRDD[T]

/**
* Return the RDD that contains our checkpointed data.
* This is only defined if the checkpoint state is `Checkpointed`.
*/
def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.synchronized { cpRDD }

/**
* Return the partitions of the resulting checkpoint RDD.
* For tests only.
*/
def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
cpRDD.map(_.partitions).getOrElse { Array.empty }
}

}

/**
* Global lock for synchronizing checkpoint operations.
*/
private[spark] object RDDCheckpointData


/**
* An implementation of checkpointing implemented on top of Spark's caching layer.
*
* Local checkpointing trades off fault tolerance for performance by skipping the expensive
* step of saving the RDD data to a reliable and fault-tolerant storage. Instead, the data
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {

/**
* Ensure the RDD is fully cached so the partitions can be recovered later.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val level = rdd.getStorageLevel

// Assume storage level uses disk; otherwise memory eviction may cause data loss
assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")

// Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
// must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
!SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
}
if (missingPartitionIndices.nonEmpty) {
rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
}

new LocalCheckpointRDD[T](rdd)
}

}
private[spark] object LocalRDDCheckpointData {

val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK

/**
* Transform the specified storage level to one that uses disk.
*
* This guarantees that the RDD can be recomputed multiple times correctly as long as
* executors do not fail. Otherwise, if the RDD is cached in memory only, for instance,
* the checkpoint data will be lost if the relevant block is evicted from memory.
*
* This method is idempotent.
*/
def transformStorageLevel(level: StorageLevel): StorageLevel = {
StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: