Spark入门2-了解RDD
2017-12-11 23:48
344 查看
朱金华 njzhujinhua 2017/12/05
简介
1 特性
RDD编程模型
1 编程模型
2 RDD的操作
源码实现
1 RDD
朱金华 njzhujinhua 2017/12/05
* 迭代式算法:迭代式机器学习,图算法,KMeans聚类,回归等
* 交互式探索:用户在同一数据集上执行多个查询
Spark编程模型正是此RDD-弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce模型的扩展和延伸,可以在并行计算阶段进行高效的数据共享。
RDD提供一种基于粗粒度的转换操作,如map,filter,join等,该接口会将其指定函数操作应用到多个数据集上,可以据此创建数据集的血统Lineage,而不需要存储真正的数据,以便节省空间,且达到在出错时根据血统关系重新生成数据。
一个RDD内部有其以来的分区Partitions信息,当某个RDD分区数据丢失的时候,RDD内部存储的信息可支撑其进行重新计算,且只需重新计算丢失的分区,这样可快速恢复丢失的数据。
1. RDD的创建只有两种方法:一种是来自于内存集合和外部存储系统,一种是通过转换(transformation,如map/filter/groupBy/join等)操作生成。
2. 其状态不可变,不能修改
2. RDD是一块分布式内存数据集, 会分成多个partitions, 以便分布在集群中各机器的内存中。 分区根据数据的存储位置确定,也可指定其分区数。还原时只会重新计算丢失分区的数据,而不会影响整个系统。
2. RDD之间是有关联的, 一个RDD可以通过transform操作把父RDD的数据转化成当前RDD的数据, 并且通过getDependencies, 可以取到所有的dependencies。 这种一个RDD经何种transformation而来的关系在RDD中叫血统(lineage),据此RDD有充足的信息关于它是如何从其他RDD产生而来的。
RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY
RDD是可以被checkpoint的, 以提高failover的效率, 当有很长的RDD链时, 单纯的依赖replay会比较低效,需要理由checkpoint机制打破链。
RDD.iterator可以产生用于迭代真正数据的Iterator[T]
在RDD上可以做各种transforms和actions,前者会生成新的RDD,而后者只是将RDD上某项操作的结果返回给程序,而不会生成新的RDD。
延迟计算,Spark并不是每一步都接着计算而是等到需要向driver提交计算结果时才执行真正的计算
上述操作挑几个最常用的说明下:
每个RDD包含五部分信息,
1. 数据的分区列表
2. 计算方法
3. 依赖关系
4. 是否是哈希/范围分区的元数据
5. 能根据本地性快速访问到数据的偏好位置
一个RDD包含SparkContext信息,一个整形的id,依赖关系,name等
以读文本文件生成的RDD为例,其生成RDD的partitions个数为2
SparkContext.textFile的实现为
其首先检查sc状态正常,未提供分区数时使用defaultMinPartitions作为分区数
hadoopFile读取指定路径的文件,返回一个hadoopRDD。hadoop的RecordReader类对每条记录重用相同的Writableobject,所以直接cache返回的RDD或直接用于aggerate或shuffle将生成很多reference,所以通常都是需要先利用map函数复制一份RDD,就是上面的
flatMap首先应用一个指定的函数于RDD的每个元素,然后将结果打平,就是说每个元素先转为一个数组,然后将,所有元素对应的数组的值合并起来作为新元素集合。
简介
1 特性
RDD编程模型
1 编程模型
2 RDD的操作
源码实现
1 RDD
朱金华 njzhujinhua 2017/12/05
1. 简介
在RDD诞生之前的分布式计算框架缺乏对分布式内存的抽象和支持,需要用到数据交换时都是要写到存储中去,RDD模型的产生动机也主要来源于两种主流的应用场景:* 迭代式算法:迭代式机器学习,图算法,KMeans聚类,回归等
* 交互式探索:用户在同一数据集上执行多个查询
Spark编程模型正是此RDD-弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce模型的扩展和延伸,可以在并行计算阶段进行高效的数据共享。
RDD提供一种基于粗粒度的转换操作,如map,filter,join等,该接口会将其指定函数操作应用到多个数据集上,可以据此创建数据集的血统Lineage,而不需要存储真正的数据,以便节省空间,且达到在出错时根据血统关系重新生成数据。
一个RDD内部有其以来的分区Partitions信息,当某个RDD分区数据丢失的时候,RDD内部存储的信息可支撑其进行重新计算,且只需重新计算丢失的分区,这样可快速恢复丢失的数据。
1.1 特性
RDD是一种只读的、分区的记录集合。具有以下一些特点:1. RDD的创建只有两种方法:一种是来自于内存集合和外部存储系统,一种是通过转换(transformation,如map/filter/groupBy/join等)操作生成。
2. 其状态不可变,不能修改
2. RDD是一块分布式内存数据集, 会分成多个partitions, 以便分布在集群中各机器的内存中。 分区根据数据的存储位置确定,也可指定其分区数。还原时只会重新计算丢失分区的数据,而不会影响整个系统。
2. RDD之间是有关联的, 一个RDD可以通过transform操作把父RDD的数据转化成当前RDD的数据, 并且通过getDependencies, 可以取到所有的dependencies。 这种一个RDD经何种transformation而来的关系在RDD中叫血统(lineage),据此RDD有充足的信息关于它是如何从其他RDD产生而来的。
RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY
RDD是可以被checkpoint的, 以提高failover的效率, 当有很长的RDD链时, 单纯的依赖replay会比较低效,需要理由checkpoint机制打破链。
RDD.iterator可以产生用于迭代真正数据的Iterator[T]
在RDD上可以做各种transforms和actions,前者会生成新的RDD,而后者只是将RDD上某项操作的结果返回给程序,而不会生成新的RDD。
延迟计算,Spark并不是每一步都接着计算而是等到需要向driver提交计算结果时才执行真正的计算
2. RDD编程模型
2.1 编程模型
Spark的开发者需要写连接集群中的 workers 的 driver 程序来使用 spark, 就比如下图中展示的. Driver 端程序定义了一系列的 RDDs 并且调用了 RDD 的 action 操作. Driver 的程序同时也会跟踪 RDDs 之间的的血缘关系. workers 是可以将 RDD 分区数据存储在内存中的长期存活的进程.2.2 RDD的操作
类型 | 操作 | 解释 |
---|---|---|
转换/transformation | map(f : T=>U) | RDD[T] => RDD[U] |
- | filter(f : T => Bool) | RDD[T] => RDD[T] |
- | flatMap(f : T => Seq[U]) | RDD[T] => RDD[U] |
- | sample(fraction : Float) | RDD[T] => RDD[T] (Deterministic sampling) |
- | groupByKey() | RDD[(K, V)] => RDD[(K, Seq[V])] |
- | reduceByKey(f : (V, V) => V) | RDD[(K, V)] => RDD[(K, V)] |
- | union() | (RDD[T], RDD[T]) => RDD[T] |
- | join() | (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))] |
- | cogroup() | (RDD[(K, V)]; RDD[(K, W)]) => RDD[(K, (Seq[V], Seq[W]))] |
- | crossProduct() | (RDD[T]; RDD[U]) => RDD[(T, U)] |
- | mapValues(f : V => W) | RDD[(K, V)] => RDD[(K, W)] (Preserves partitioning) |
- | sort(c : Comparator[K]) | RDD[(K, V)] => RDD[(K, V)] |
- | partitionBy(p : Partitioner[K]) | RDD[(K, V)] => RDD[(K, V)] |
动作/actions | count() | RDD[T] => Long |
- | collect() | RDD[T] => Seq[T] |
- | reduce(f : (T, T) => T) | RDD[T] => T |
- | lookup(k : K) | RDD[(K, V)] => Seq[V] (On hash/range partitioned RDDs) |
- | save(path : String) | Outputs RDD to a storage system, e.g., HDFS |
map:将数据集中的每个元素都执行其提供的函数,并返回新的RDD
filter:则是过滤符合f作用于元素上为真的集合作为新的RDD
flatMap:将数据集中的每个元素都执行其提供的函数,得到一个列表,然后将其扁平化,即再打散。
groupByKey:按(K,V)对组成的RDD按K进行汇总成(K,Seq[V])的RDD
reduceByKey:按key进行汇聚,并对其所有的值执行提供的方法,如reduceByKey(+)或reduceByKey((v1,v2)=>v1+v2)表示对相同key的值求和
count:返回个数
collect:返回内容
3. 源码实现
3.1 RDD
/* Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) */
每个RDD包含五部分信息,
1. 数据的分区列表
2. 计算方法
3. 依赖关系
4. 是否是哈希/范围分区的元数据
5. 能根据本地性快速访问到数据的偏好位置
一个RDD包含SparkContext信息,一个整形的id,依赖关系,name等
scala> rdd.partitions res13: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1) scala> rdd.name res14: String = null scala> rdd.id res15: Int = 5 scala> rdd.dependencies res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@afcfc63)
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD) /** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ } } /** * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } }
以读文本文件生成的RDD为例,其生成RDD的partitions个数为2
scala> val file=sc.textFile("README.md") file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[8] at textFile at <console>:24 scala> file.partitions res20: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@49a, org.apache.spark.rdd.HadoopPartition@49b) scala> file.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@15fd41c0) scala> val words=file.flatMap(_.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:26 scala> words.partitions res22: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@49a, org.apache.spark.rdd.HadoopPartition@49b)
SparkContext.textFile的实现为
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. * @param path path to the text file on a supported file system * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of lines of the text file */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
其首先检查sc状态正常,未提供分区数时使用defaultMinPartitions作为分区数
hadoopFile读取指定路径的文件,返回一个hadoopRDD。hadoop的RecordReader类对每条记录重用相同的Writableobject,所以直接cache返回的RDD或直接用于aggerate或shuffle将生成很多reference,所以通常都是需要先利用map函数复制一份RDD,就是上面的
map(pair => pair._2.toString)
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
flatMap首先应用一个指定的函数于RDD的每个元素,然后将结果打平,就是说每个元素先转为一个数组,然后将,所有元素对应的数组的值合并起来作为新元素集合。
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
相关文章推荐
- Spark java程序入门(二)创建RDD与查看RDD内容
- Spark入门3-RDD的实现
- Spark 2.0从入门到精通245讲——操作RDD(transformation案例实战)
- Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)
- Spark入门实战系列--6.SparkSQL(中)--深入了解SparkSQL运行计划及调优
- Spark RDD入门详解
- Spark入门(四):RDD基本操作
- Spark入门实战系列--6.SparkSQL(中)--深入了解运行计划及调优
- 子雨大数据之Spark入门教程---Spark入门:RDD的设计与运行原理1.3
- Spark入门_1_RddTransAction
- Spark基础入门(二)--------DAG与RDD依赖
- Spark入门实战系列--6.SparkSQL(中)--深入了解SparkSQL运行计划及调优
- sparkSQL1.1入门之四:深入了解sparkSQL执行计划
- Spark入门基础(1)——RDD【转】
- Spark 2.0从入门到精通245讲——操作RDD(action案例实战)
- Spark入门实战系列--6.SparkSQL(中)--深入了解SparkSQL运行计划及调优
- Spark入门——1:RDD及编程接口
- Spark入门实战系列--6.SparkSQL(中)--深入了解运行计划及调优
- spark从入门到放弃六: RDD 持久化原理
- Spark2.x 入门:RDD队列流(DStream)