RDD.scala源码
2015-09-01 12:05
211 查看
定义:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents animmutable,
partitioned
collection of elements that can be operated on inparallel.
包括RDD的基本操作,比如map/filter/persist等
含有RDD到各种其他类型RDD的隐式转换:
PairRDDFunctions、DoubleRDDFunctions、SequenceFileRDDFunctions、AsyncRDDActions、OrderedRDDFunctions和numericRDDToDoubleRDDFunctions
特点:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an
HDFS file
关于Spark RDD的一篇论文:
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
// 注解可以在程序中的各项条目添加信息,这些信息可以被编译器或外部工具处理。注解是插入到代码中以便有工具可以对它们进行处理的标签。
// Scala中可以为类、方法、字段、局部变量和参数添加注解,与Java一样。可以同时添加多个注解,先后顺序没有影响。
abstract class
RDD[T: ClassTag](
@transient private var _sc: SparkContext, //@transient注解将字段标记为瞬态的,不可序列化;
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
// =======================================================================
// 下面是能反映RDD本身特性的方法,由其子类实现
// 1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。
// 2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。
// 3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
// 4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。
// 5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。
// =======================================================================
/**
* RDD存储级别,默认MEMORY_ONLY,可以通过persist指定新的存储级别
*/
具体包括的RDD操作就不列举了。。。。
真的太多了,我觉得凡是你能想到对RDD这种集合特性的所有操作,源码里面都有实现。
RDD本身是为了做分布式的,适合大量数据的进行高效的并行分布式的操作的批处理。
RDD是粗粒度的实现,若是没有没有内存的时候,在内存和磁盘数据交换方面还是不错的。
分布式系统实现的时候需要考虑一致性!函数编程的核心法则是不变性。
RDD容错是细粒度的,按照血统关系单任务去恢复,容错的代价的非常小,从而效率很高。
RDD的并行任务从单个Task考虑,不会发生网络拷贝,不同与Hadoop的MP;
集群使用RDD进行批量操作时,运行本身是遵循任务的本地性。Worker从分布式系统中读取数据进行计算。
RDD比较不适合异步细粒度的数据更新,比如网络爬虫、数据更新等;
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents animmutable,
partitioned
collection of elements that can be operated on inparallel.
包括RDD的基本操作,比如map/filter/persist等
含有RDD到各种其他类型RDD的隐式转换:
PairRDDFunctions、DoubleRDDFunctions、SequenceFileRDDFunctions、AsyncRDDActions、OrderedRDDFunctions和numericRDDToDoubleRDDFunctions
特点:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an
HDFS file
关于Spark RDD的一篇论文:
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
// 注解可以在程序中的各项条目添加信息,这些信息可以被编译器或外部工具处理。注解是插入到代码中以便有工具可以对它们进行处理的标签。
// Scala中可以为类、方法、字段、局部变量和参数添加注解,与Java一样。可以同时添加多个注解,先后顺序没有影响。
abstract class
RDD[T: ClassTag](
@transient private var _sc: SparkContext, //@transient注解将字段标记为瞬态的,不可序列化;
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
// =======================================================================
// 下面是能反映RDD本身特性的方法,由其子类实现
// 1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。
// 2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。
// 3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
// 4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。
// 5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。
// =======================================================================
@DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] protected def getPartitions: Array[Partition] protected def getDependencies: Seq[Dependency[_]] = deps protected def getPreferredLocations(split: Partition): Seq[String] = Nil @transient val partitioner: Option[Partitioner] = None
/**
* RDD存储级别,默认MEMORY_ONLY,可以通过persist指定新的存储级别
*/
<pre name="code" class="plain"> def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
具体包括的RDD操作就不列举了。。。。
真的太多了,我觉得凡是你能想到对RDD这种集合特性的所有操作,源码里面都有实现。
RDD本身是为了做分布式的,适合大量数据的进行高效的并行分布式的操作的批处理。
RDD是粗粒度的实现,若是没有没有内存的时候,在内存和磁盘数据交换方面还是不错的。
分布式系统实现的时候需要考虑一致性!函数编程的核心法则是不变性。
RDD容错是细粒度的,按照血统关系单任务去恢复,容错的代价的非常小,从而效率很高。
RDD的并行任务从单个Task考虑,不会发生网络拷贝,不同与Hadoop的MP;
集群使用RDD进行批量操作时,运行本身是遵循任务的本地性。Worker从分布式系统中读取数据进行计算。
RDD比较不适合异步细粒度的数据更新,比如网络爬虫、数据更新等;
相关文章推荐
- Android framework系统默认设置修改
- 二分图带权最大独立集
- python2.7学习笔记(7) ——函数式编程
- git的使用介绍
- Just a Numble HDU杭电2117
- CentOS安装配置VSFTP服务器
- SSH框架Action向页面传值差异
- Mysql主从复制与读写分离闲谈
- 利用C语言进行urldecode,解决浏览器中的urlencode
- A1 = ? HDU 杭电2086 【数学】
- Oracle GoldenGate 三、加密
- TortoiseSVN常用操作指南
- Cocos2d-x中Vector容器以及实例介绍
- 软件测试学习随记
- Mac下git的安装与配置
- 24个设计模式学习
- rotate.js实现图片旋转 (chrome,IE,firefox都可以实现)
- 商城后台编辑栏目
- 多线程(1)
- 清除li间隔