您的位置:首页 > 其它

Spark入门2-了解RDD

2017-12-11 23:48 344 查看
朱金华 njzhujinhua 2017/12/05

简介
1 特性

RDD编程模型
1 编程模型

2 RDD的操作

源码实现
1 RDD

朱金华 njzhujinhua 2017/12/05

1. 简介

在RDD诞生之前的分布式计算框架缺乏对分布式内存的抽象和支持,需要用到数据交换时都是要写到存储中去,RDD模型的产生动机也主要来源于两种主流的应用场景:

* 迭代式算法:迭代式机器学习,图算法,KMeans聚类,回归等

* 交互式探索:用户在同一数据集上执行多个查询

Spark编程模型正是此RDD-弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce模型的扩展和延伸,可以在并行计算阶段进行高效的数据共享。

RDD提供一种基于粗粒度的转换操作,如map,filter,join等,该接口会将其指定函数操作应用到多个数据集上,可以据此创建数据集的血统Lineage,而不需要存储真正的数据,以便节省空间,且达到在出错时根据血统关系重新生成数据。

一个RDD内部有其以来的分区Partitions信息,当某个RDD分区数据丢失的时候,RDD内部存储的信息可支撑其进行重新计算,且只需重新计算丢失的分区,这样可快速恢复丢失的数据。

1.1 特性

RDD是一种只读的、分区的记录集合。具有以下一些特点:

1. RDD的创建只有两种方法:一种是来自于内存集合和外部存储系统,一种是通过转换(transformation,如map/filter/groupBy/join等)操作生成。

2. 其状态不可变,不能修改

2. RDD是一块分布式内存数据集, 会分成多个partitions, 以便分布在集群中各机器的内存中。 分区根据数据的存储位置确定,也可指定其分区数。还原时只会重新计算丢失分区的数据,而不会影响整个系统。

2. RDD之间是有关联的, 一个RDD可以通过transform操作把父RDD的数据转化成当前RDD的数据, 并且通过getDependencies, 可以取到所有的dependencies。 这种一个RDD经何种transformation而来的关系在RDD中叫血统(lineage),据此RDD有充足的信息关于它是如何从其他RDD产生而来的。

RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY

RDD是可以被checkpoint的, 以提高failover的效率, 当有很长的RDD链时, 单纯的依赖replay会比较低效,需要理由checkpoint机制打破链。

RDD.iterator可以产生用于迭代真正数据的Iterator[T]

在RDD上可以做各种transforms和actions,前者会生成新的RDD,而后者只是将RDD上某项操作的结果返回给程序,而不会生成新的RDD。

延迟计算,Spark并不是每一步都接着计算而是等到需要向driver提交计算结果时才执行真正的计算

2. RDD编程模型

2.1 编程模型

Spark的开发者需要写连接集群中的 workers 的 driver 程序来使用 spark, 就比如下图中展示的. Driver 端程序定义了一系列的 RDDs 并且调用了 RDD 的 action 操作. Driver 的程序同时也会跟踪 RDDs 之间的的血缘关系. workers 是可以将 RDD 分区数据存储在内存中的长期存活的进程.



2.2 RDD的操作

类型操作解释
转换/transformationmap(f : T=>U)RDD[T] => RDD[U]
-filter(f : T => Bool)RDD[T] => RDD[T]
-flatMap(f : T => Seq[U])RDD[T] => RDD[U]
-sample(fraction : Float)RDD[T] => RDD[T] (Deterministic sampling)
-groupByKey()RDD[(K, V)] => RDD[(K, Seq[V])]
-reduceByKey(f : (V, V) => V)RDD[(K, V)] => RDD[(K, V)]
-union()(RDD[T], RDD[T]) => RDD[T]
-join()(RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))]
-cogroup()(RDD[(K, V)]; RDD[(K, W)]) => RDD[(K, (Seq[V], Seq[W]))]
-crossProduct()(RDD[T]; RDD[U]) => RDD[(T, U)]
-mapValues(f : V => W)RDD[(K, V)] => RDD[(K, W)] (Preserves partitioning)
-sort(c : Comparator[K])RDD[(K, V)] => RDD[(K, V)]
-partitionBy(p : Partitioner[K])RDD[(K, V)] => RDD[(K, V)]
动作/actionscount()RDD[T] => Long
-collect()RDD[T] => Seq[T]
-reduce(f : (T, T) => T)RDD[T] => T
-lookup(k : K)RDD[(K, V)] => Seq[V] (On hash/range partitioned RDDs)
-save(path : String)Outputs RDD to a storage system, e.g., HDFS
上述操作挑几个最常用的说明下:

map
:将数据集中的每个元素都执行其提供的函数,并返回新的RDD

filter
:则是过滤符合f作用于元素上为真的集合作为新的RDD

flatMap
:将数据集中的每个元素都执行其提供的函数,得到一个列表,然后将其扁平化,即再打散。

groupByKey
:按(K,V)对组成的RDD按K进行汇总成(K,Seq[V])的RDD

reduceByKey
:按key进行汇聚,并对其所有的值执行提供的方法,如reduceByKey(+)或reduceByKey((v1,v2)=>v1+v2)表示对相同key的值求和

count
:返回个数

collect
:返回内容

3. 源码实现

3.1 RDD

/* Internally, each RDD is characterized by five main properties:
*
*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)
*/


每个RDD包含五部分信息,

1. 数据的分区列表

2. 计算方法

3. 依赖关系

4. 是否是哈希/范围分区的元数据

5. 能根据本地性快速访问到数据的偏好位置

一个RDD包含SparkContext信息,一个整形的id,依赖关系,name等

scala> rdd.partitions
res13: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1)

scala> rdd.name
res14: String = null

scala> rdd.id
res15: Int = 5

scala> rdd.dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@afcfc63)


// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
private var dependencies_ : Seq[Dependency[_]] = null
@transient private var partitions_ : Array[Partition] = null

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)

/**
* Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}

/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
partitions_
}
}


以读文本文件生成的RDD为例,其生成RDD的partitions个数为2

scala> val file=sc.textFile("README.md")
file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[8] at textFile at <console>:24

scala> file.partitions
res20: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@49a, org.apache.spark.rdd.HadoopPartition@49b)

scala> file.dependencies
res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@15fd41c0)

scala> val words=file.flatMap(_.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:26

scala> words.partitions
res22: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@49a, org.apache.spark.rdd.HadoopPartition@49b)


SparkContext.textFile的实现为

/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
* @param path path to the text file on a supported file system
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of lines of the text file
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}


其首先检查sc状态正常,未提供分区数时使用defaultMinPartitions作为分区数

hadoopFile读取指定路径的文件,返回一个hadoopRDD。hadoop的RecordReader类对每条记录重用相同的Writableobject,所以直接cache返回的RDD或直接用于aggerate或shuffle将生成很多reference,所以通常都是需要先利用map函数复制一份RDD,就是上面的
map(pair => pair._2.toString)


def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}


flatMap首先应用一个指定的函数于RDD的每个元素,然后将结果打平,就是说每个元素先转为一个数组,然后将,所有元素对应的数组的值合并起来作为新元素集合。

/**
*  Return a new RDD by first applying a function to all elements of this
*  RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark