您的位置:首页 > 其它

RDD.scala源码

2015-09-01 12:05 211 查看
定义:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents animmutable,

partitioned
collection of elements that can be operated on inparallel.

包括RDD的基本操作,比如map/filter/persist等

含有RDD到各种其他类型RDD的隐式转换:

PairRDDFunctions、DoubleRDDFunctions、SequenceFileRDDFunctions、AsyncRDDActions、OrderedRDDFunctions和numericRDDToDoubleRDDFunctions

特点:

- A list of partitions

- A function for computing each split

- A list of dependencies on other RDDs

- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an
HDFS file

关于Spark RDD的一篇论文:

http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

// 注解可以在程序中的各项条目添加信息,这些信息可以被编译器或外部工具处理。注解是插入到代码中以便有工具可以对它们进行处理的标签。

// Scala中可以为类、方法、字段、局部变量和参数添加注解,与Java一样。可以同时添加多个注解,先后顺序没有影响。

abstract class
RDD[T: ClassTag](

@transient private var _sc: SparkContext, //@transient注解将字段标记为瞬态的,不可序列化;

@transient private var deps: Seq[Dependency[_]]

) extends Serializable with Logging

// =======================================================================

// 下面是能反映RDD本身特性的方法,由其子类实现

// 1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。

// 2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。

// 3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

// 4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。

// 5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。

// =======================================================================

@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
@transient val partitioner: Option[Partitioner] = None


/**

* RDD存储级别,默认MEMORY_ONLY,可以通过persist指定新的存储级别

*/

<pre name="code" class="plain">  def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()




具体包括的RDD操作就不列举了。。。。

真的太多了,我觉得凡是你能想到对RDD这种集合特性的所有操作,源码里面都有实现。

RDD本身是为了做分布式的,适合大量数据的进行高效的并行分布式的操作的批处理。

RDD是粗粒度的实现,若是没有没有内存的时候,在内存和磁盘数据交换方面还是不错的。

分布式系统实现的时候需要考虑一致性!函数编程的核心法则是不变性。

RDD容错是细粒度的,按照血统关系单任务去恢复,容错的代价的非常小,从而效率很高。

RDD的并行任务从单个Task考虑,不会发生网络拷贝,不同与Hadoop的MP;

集群使用RDD进行批量操作时,运行本身是遵循任务的本地性。Worker从分布式系统中读取数据进行计算。

RDD比较不适合异步细粒度的数据更新,比如网络爬虫、数据更新等;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: