您的位置:首页 > 大数据 > Hadoop

Spark入门学习——要点1

2018-09-14 16:45 465 查看

1、collect() 函数

RDD 还有一个collect() 函数,可以用来获取整个RDD 中的数据。如果你的程序把RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此,collect() 不能用在大规模数据集上。使用collect() 使得RDD 的值与预期结果之间的对比变得很容易。由于需要将数据复制到驱动器进程中,collect() 要求所有数据都必须能一同放入单台机器的内存中。

2、Java标准函数接口

函数名实现的方法用途
Function<T,R>R call(T)接收一个输入值并返回一个输出值,用于类似map() 和filter() 等操作中
Function2<T1, T2, R>R call(T1, T2)接收两个输入值并返回一个输出值,用于类似aggregate()和fold() 等操作中
FlatMapFunction<T, R>Iterable<R> call(T)接收一个输入值并返回任意个输出,用于类似flatMap()这样的操作中

3、常见的转化操作和action操作

对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

函数名目的示例结果
map()将函数应用于Rdd中每个元素,将返回值构成新的RDDrdd.map(x->x+1){2, 3, 4, 4}
flatMap()将函数应用于RDD 中的每个元
素,将返回的迭代器的所有内
容构成新的RDD。通常用来切
分单词
rdd.flatMap(x => x.to(3)){1, 2, 3, 2, 3, 3, 3}
filter()返回一个由通过传给filter()
的函数的元素组成的RDD
rdd.filter(x => x != 1){2, 3, 3}
distinct() 去重rdd.distinct(){1, 2, 3}
sample(withReplacement,
fraction, [seed])
对RDD 采样,以及是否替rdd.sample(false, 0.5)非确定的
对数据分别为{1,2,3} 和{3,4,5}的RDD进行针对两个RDD的转化操作

函数名目的示例结果
union()生成一个包含两个RDD 中所有元
素的RDD
rdd.union(other){1, 2, 3, 3, 4, 5}
intersection()求两个RDD 共同的元素的RDDrdd.intersection(other){3}
subtract()移除一个RDD 中的内容(例如移
除训练数据)
rdd.subtract(other){1, 2}
cartesian()与另一个RDD 的笛卡儿积rdd.cartesian(other){(1, 3), (1, 4), ...
(3, 5)}
action操作

行动涵义
reduce(func)使用传入的函数参数 func 对数据集中的元素进行汇聚操作 (两两合并). 该函数应该具有可交换与可结合的性质, 以便于能够正确地进行并行计算.
collect()在 driver program 上将数据集中的元素作为一个数组返回. 这在执行一个 filter 或是其他返回一个足够小的子数据集操作后十分有用.
count()返回数据集中的元素个数
first()返回数据集中的第一个元素 (与 take(1) 类似)
take(n)返回数据集中的前 n 个元素
takeSample(withReplacement, num, [seed])以数组的形式返回数据集中随机采样的 num 个元素.
takeOrdered(n, [ordering])以其自然序或使用自定义的比较器返回 RDD 的前 n 元素
saveAsTextFile(path)将数据集中的元素写入到指定目录下的一个或多个文本文件中, 该目录可以存在于本地文件系统, HDFS 或其他 Hadoop 支持的文件系统. Spark 将会对每个元素调用 toString 将其转换为文件的一行文本.
saveAsSequenceFile(path)(Java and Scala)对于本地文件系统, HDFS 或其他任何 Hadoop 支持的文件系统上的一个指定路径, 将数据集中的元素写为一个 Hadoop SequenceFile. 仅适用于实现了 Hadoop Writable 接口的 kay-value pair 的 RDD. 在 Scala 中, 同样适用于能够被隐式转换成 Writable 的类型上 (Spark 包含了对于 Int, Double, String 等基本类型的转换).
saveAsObjectFile(path)(Java and Scala)使用 Java 序列化将数据集中的元素简单写为格式化数据, 可以通过 SparkContext.objectFile() 进行加载.
countByKey()仅适用于 (K, V) 类型的 RDD. 返回每个 key 的 value 数的一个 hashmap (K, int) pair.
foreach(func对数据集中的每个元素执行函数 func. 这通常用于更新一个 Accumulator 或与外部存储系统交互时的副作用. 注意: 修改 foreach() 外的非 Accumulator 变量可能导致未定义的行为. 更多细节请查看 Understanding closures.

4、distinct()函数

我们的RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用RDD.distinct() 转化操作来生成一个只包含不同元素的新RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。

5、Spark RDD缓存/持久化策略

为了避免多次计算同一个RDD,可以让Spark 对数据进行持久化。

出于不同的目的,我们可以为RDD 选择不同的持久化级别(如表3-6 所示)。在Scala(见 例3-40)和Java 中,默认情况下persist() 会把数据以序列化的形式缓存在JVM 的堆空间中。在Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM 堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。

表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份



如果在调用RDD的action的操作之前调用了persist()操作,并不会马上出发强制求值机制,仍然是懒加载的机制。

如果缓存的数据过多,内存中放不下,Spark会自动利用最近最少使用(LRU)缓存策略进行数据分区移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区,这些分区需要重新计算。但是对于内存+磁盘模式的缓存级别的分区来说,被移除的分区都会被写入磁盘。

RDD还有一个方法叫做UNpersist(),调用该方法可以手动把持久化的RDD从缓存中删除。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Apache Spark Scala Hadoop