Learning Spark - LIGHTNING-FAST DATA ANALYSIS 第三章 - (4)
2015-09-27 20:11
411 查看
更新,第三章完整版PDF可下载:Learning
Spark 第三章 RDD编程 已翻译整理完毕,PDF可下载
续啊续,我还续:上一篇:Learning
Spark - LIGHTNING-FAST DATA ANALYSIS 第三章 - (3)
动作
对于基本RDD,你最常用到的动作是reduce()。它传入一个函数,该函数对RDD中两个元素进行处理,并返回一个同类型的元素。这类函数的一个简单例子是+,用于计算RDD中元素的和。有了reduce(),我们可以轻松的计算RDD中元素的和,元素的个数,以及其他类型的聚合(见示例3-32到3-34)。
示例3-32 Python中使用reduce()
sum = rdd.reduce(lambda x, y: x + y)
示例3-33 Scala中使用reduce()
val sum = rdd.reduce((x, y) => x + y)
示例3-32 Java中使用reduce()
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
fold()函数和reduce()函数类似,也是带了一个和reduce()相同的函数参数,但是多了一个“零值”用于在每个分区调用时初始化。你提供的初值对你的操作来说是恒等值,也就是说,你的函数对其应用多次都不会改变该值(例如,0对于加法操作,1对于乘法操作,或者空列表对于连接操作)。
你可以在fold()中通过修改和返回两个参数中的第一个参数来最小化对象创建。但是你不能修改第二个参数。
fold()和reduce()都要求返回结果的类型和处理的RDD的类型相同。对于求和来说很好,但是有时候我们想返回不同的类型。比如,当计算一个运行时的平均值,我们需要同时记录总量和元素个数,这就要求我们返回一个对值(pair)。我们可以先用map()对每个元素做变换形成元素和数字1的对值,也就是我们要返回的类型,然后就能用reduce()进行处理。
aggregate()函数将我们从被约束只能返回处理的RDD的相同类型RDD中解脱了。aggregate()和fold()一样有一个初始的零值,但是可以是我们想要返回的类型。然后我们提供一个函数合并所有元素到累加器。最后,我们需要提供第二个函数来合并这些累加器,每个累加器都是它们本地结果数据的累积。
我们用aggregate()来计算RDD的平均值,避免用folder()前还要先map(),见示例3-35到3-37。
示例3-35 Python中使用aggregate()
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
示例3-36 Scala中使用aggregate()
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
示例3-37 Java中使用aggregate()
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
有些RDD的动作会以常规的集合或值的形式返回部分或所有数据到驱动程序。
最简单最常用的返回数据到驱动程序的操作是collect(),返回整个RDD的数据。collect()通常用于单元测试,整个RDD的内容能放到内存中,这样就能轻易的比较RDD是否是我们期待的结果。collect()受限于所有的数据必须适合单机,因为所有的数据要复制到驱动程序所在机器上。
take(n)返回RDD中的n个元素,试图最小化访问的分区的数目。所以它返回的是有偏差的集合。重要的是知道这操作不会以你期待的顺序返回数据。
这些操作对于单元测试或者快速调试时很有用,但是处理大量数据时会有瓶颈。
如果是已经有序的数据集,我们可以用top()函数从RDD中提取前面的若干元素。top()使用数据的默认顺序,但是你可以提供一个比较函数来提取前面的元素。
有时在驱动程序中需要数据的样本。takeSample(withReplacement, num, seed)函数允许我们对数据采用,可以同时用随机数替换值或者不替换。
有时对RDD中所有元素都执行一个动作,但是不返回任何结果到驱动程序,也是有用的。一个不错的例子是发送JSON到webserver或者插入记录到数据库,这两种情况都能用foreach()这个动作对每个元素执行计算,但是不返回到本地。
对基本RDD的更多的标准操作的准确的行为你都能从它们的名字上想象的到。Count()返回元素的个数,countByValue()返回每个唯一值对应的个数的map。表3-4汇总了这些动作。
表格 3-4 对包含{1, 2, 3, 3}的RDD执行动作
RDD类型之间的转换
有些函数只对某种类型的RDD可用,比如mean()和variance()对数值类型的RDD可用,而join()对键值对类型的RDD可用。我们会在第六章涉及到数值RDD,第四章涉及键值对的RDD。在Scala和Java中,标准RDD没有定义这些方法。所以,要访问这些附加的方法,我们必须确保我们得到了正确的类型。
Scala
在Scala中转换有特定功能的RDD(比如对RDD[Double]暴露数值功能)是通过隐式转换自动处理的。在17页提到的“初始化SparkContext”中,我们需要添加import org.apache.spark.SparkContext._以便这些转换能工作。你可以看看SparkContext对象的Scala文档中列出的隐式转换。RDD被隐式的转换成各种封装类,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunctions(键值对的RDD),以便暴露出类似mean()或者variance()等附加的功能。
隐式转换虽然很强大,但有时会让人混淆。如果你对RDD调用mean()类似的函数,可能你看到Scala的文档中的RDD类并没有mean()函数。这个调用能成功是因为从RDD[Double]到DoubleRDDFunctions之间的隐式转换。在Scala文档中查找RDD的这些函数时,确保看看这些封装类中可用的函数。
Java
在Java中,特定类型的RDD之间的转换要明显一些。特别是JavaDoubleRDD和JavaPairRDD这些对数据类型有额外的方法的类。好处是让你更好的理解转换时如何进行的,但是有一点点麻烦。
要构造这些特殊类型的RDD,而不是总使用函数类,我们需要使用特定的版本。如果我们想从一个类型为T的RDD创建DoubleRDD,我们使用DoubleFunction<T>而不是Function<T,Double>。这些特殊函数及用法见表3-5。
我们同样需要对RDD调用不同的函数(我们不能只是创建一个Double函数传递给map())。当我们想要一个DoubleRDD时,和下面的其他函数的模式一样,我们需要调用mapToDouble()而不是map()。
表格 3-5 特定类型函数的Java接口
我们修改一下示例3-28,在那里我们计算RDD中的数值的平方来生成一个新的JavaDoubleRDD,见示例3-38。这使得我们可以访问JavaDoubleRDD的额外的特殊函数,如mean()和variance()等。
示例3-38 Java中创建DoubleRDD
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x * x;
}
});
System.out.println(result.mean());
Python
Python API的结构跟Java和Scala不同。在Python中,所有的函数都实现在基本RDD中,但是如果运行时RDD中的数据类型不正确会失败。
持久化(缓存)
之前说过,Spark RDD是延迟求值的,有时候我们会想多次使用同一个RDD。如果我们这么天真的做了,那么每次对这个RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD。这对于迭代计算时尤其昂贵,它会查找这些数据很多次。另一个浅显的例子是对同一个RDD先计数然后输出,如示例3-39。
示例3-39 Scala中两次执行
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))
为避免多次计算同一个RDD,我们可以要求Spark缓存该数据。当我们要求Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区。如果我们想在节点失败是处理不会变慢,那么我们可以复制数据到多个节点。
基于我们的目的,Spark有多个级别的持久策略可选择,见表3-6。在Scala(示例3-40)和Java中,默认的persist()是存储数据在JVM中作为非序列化对象。Python中我们总是序列化数据持久保存,所以默认是在JVM中保存为序列化对象。当我们输出数据到磁盘或者堆外存储时,数据总是序列化的。
表格 3-6 org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的存储级别;如果需要的话还可以通过添加_2到存储级别末尾来复制数据到2台机器
堆外缓存正在测试,用的是Tachyon。如果你对Spark的堆外缓存有兴趣,可以看看Running Spark On Tachyon guide。
示例3-40 Scala中使用persist()
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
注意,我们是在第一个动作之前对RDD调用的persist()。persist()对其自身调用不会导致求值。
如果你试图缓存太多的数据,一致超出了内存,Spark会使用LRU缓存策略丢弃旧的分区。对于memory-only存储级别,Spark会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据被丢弃而进行过多的计算。
最后,RDD还提供了unpersist()函数给你手动释放缓存。
总结
在本章中,我们讲到了RDD的执行模型和大量常见的RDD操作。如果你都掌握了,恭喜——你已经学到了所有Spark的核心概念。在下一章中,我们会讲一组针对键值对的RDD的特殊操作,这在聚合或分组并行计算时很常用。之后会讨论各种数据源的输入和输出,以及关于SparkContext的更进一步的主题。
Spark 第三章 RDD编程 已翻译整理完毕,PDF可下载
续啊续,我还续:上一篇:Learning
Spark - LIGHTNING-FAST DATA ANALYSIS 第三章 - (3)
动作
对于基本RDD,你最常用到的动作是reduce()。它传入一个函数,该函数对RDD中两个元素进行处理,并返回一个同类型的元素。这类函数的一个简单例子是+,用于计算RDD中元素的和。有了reduce(),我们可以轻松的计算RDD中元素的和,元素的个数,以及其他类型的聚合(见示例3-32到3-34)。
示例3-32 Python中使用reduce()
sum = rdd.reduce(lambda x, y: x + y)
示例3-33 Scala中使用reduce()
val sum = rdd.reduce((x, y) => x + y)
示例3-32 Java中使用reduce()
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
fold()函数和reduce()函数类似,也是带了一个和reduce()相同的函数参数,但是多了一个“零值”用于在每个分区调用时初始化。你提供的初值对你的操作来说是恒等值,也就是说,你的函数对其应用多次都不会改变该值(例如,0对于加法操作,1对于乘法操作,或者空列表对于连接操作)。
你可以在fold()中通过修改和返回两个参数中的第一个参数来最小化对象创建。但是你不能修改第二个参数。
fold()和reduce()都要求返回结果的类型和处理的RDD的类型相同。对于求和来说很好,但是有时候我们想返回不同的类型。比如,当计算一个运行时的平均值,我们需要同时记录总量和元素个数,这就要求我们返回一个对值(pair)。我们可以先用map()对每个元素做变换形成元素和数字1的对值,也就是我们要返回的类型,然后就能用reduce()进行处理。
aggregate()函数将我们从被约束只能返回处理的RDD的相同类型RDD中解脱了。aggregate()和fold()一样有一个初始的零值,但是可以是我们想要返回的类型。然后我们提供一个函数合并所有元素到累加器。最后,我们需要提供第二个函数来合并这些累加器,每个累加器都是它们本地结果数据的累积。
我们用aggregate()来计算RDD的平均值,避免用folder()前还要先map(),见示例3-35到3-37。
示例3-35 Python中使用aggregate()
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
示例3-36 Scala中使用aggregate()
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
示例3-37 Java中使用aggregate()
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
有些RDD的动作会以常规的集合或值的形式返回部分或所有数据到驱动程序。
最简单最常用的返回数据到驱动程序的操作是collect(),返回整个RDD的数据。collect()通常用于单元测试,整个RDD的内容能放到内存中,这样就能轻易的比较RDD是否是我们期待的结果。collect()受限于所有的数据必须适合单机,因为所有的数据要复制到驱动程序所在机器上。
take(n)返回RDD中的n个元素,试图最小化访问的分区的数目。所以它返回的是有偏差的集合。重要的是知道这操作不会以你期待的顺序返回数据。
这些操作对于单元测试或者快速调试时很有用,但是处理大量数据时会有瓶颈。
如果是已经有序的数据集,我们可以用top()函数从RDD中提取前面的若干元素。top()使用数据的默认顺序,但是你可以提供一个比较函数来提取前面的元素。
有时在驱动程序中需要数据的样本。takeSample(withReplacement, num, seed)函数允许我们对数据采用,可以同时用随机数替换值或者不替换。
有时对RDD中所有元素都执行一个动作,但是不返回任何结果到驱动程序,也是有用的。一个不错的例子是发送JSON到webserver或者插入记录到数据库,这两种情况都能用foreach()这个动作对每个元素执行计算,但是不返回到本地。
对基本RDD的更多的标准操作的准确的行为你都能从它们的名字上想象的到。Count()返回元素的个数,countByValue()返回每个唯一值对应的个数的map。表3-4汇总了这些动作。
表格 3-4 对包含{1, 2, 3, 3}的RDD执行动作
函数名 | 目的 | 示例 | 结果 |
collect() | 返回RDD中的所有元素 | rdd.collect( ) | {1, 2, 3, 3, 4, 5} |
count() | 返回RDD中元素个数 | rdd.count() | 4 |
countByValue() | RDD中每个元素出现的次数 | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)} |
函数名 | 目的 | 示例 | 结果 |
take(num) | 返回RDD中的num个元素 | rdd.take(2) | {1, 2} |
top(num) | 返回RDD中前num个元素 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ording) | 返回RDD中基于给定顺序的num个元素 | rdd.takeOrder(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 随机返回RDD中的num个元素 | rdd.takeSample(false, 1) | Nondeterministic |
reduce(func) | 并行合并RDD中的元素(比如求和) | rdd.reduce((x,y) => x+y) | 9 |
fold(func) | 和reduce()一样,但是提供了一个初值 | rdd.fold(0)((x,y)=>x+y) | 9 |
aggregate(zeroValue) (seqOp, combOp) | 类似reduce(),但是用于返回不同的类型 | rdd.aggregate((0,0) ((x, y) => (x._1 + y, x._2 +1), (x, y) => (x._1 + y._1, x._2 + y._2)) | (9, 4) |
foreach(func) | 对RDD中的每个元素应用函数func | rdd.foreach(func) | Noting |
RDD类型之间的转换
有些函数只对某种类型的RDD可用,比如mean()和variance()对数值类型的RDD可用,而join()对键值对类型的RDD可用。我们会在第六章涉及到数值RDD,第四章涉及键值对的RDD。在Scala和Java中,标准RDD没有定义这些方法。所以,要访问这些附加的方法,我们必须确保我们得到了正确的类型。
Scala
在Scala中转换有特定功能的RDD(比如对RDD[Double]暴露数值功能)是通过隐式转换自动处理的。在17页提到的“初始化SparkContext”中,我们需要添加import org.apache.spark.SparkContext._以便这些转换能工作。你可以看看SparkContext对象的Scala文档中列出的隐式转换。RDD被隐式的转换成各种封装类,比如DoubleRDDFunctions(数值数据的RDD)和PairRDDFunctions(键值对的RDD),以便暴露出类似mean()或者variance()等附加的功能。
隐式转换虽然很强大,但有时会让人混淆。如果你对RDD调用mean()类似的函数,可能你看到Scala的文档中的RDD类并没有mean()函数。这个调用能成功是因为从RDD[Double]到DoubleRDDFunctions之间的隐式转换。在Scala文档中查找RDD的这些函数时,确保看看这些封装类中可用的函数。
Java
在Java中,特定类型的RDD之间的转换要明显一些。特别是JavaDoubleRDD和JavaPairRDD这些对数据类型有额外的方法的类。好处是让你更好的理解转换时如何进行的,但是有一点点麻烦。
要构造这些特殊类型的RDD,而不是总使用函数类,我们需要使用特定的版本。如果我们想从一个类型为T的RDD创建DoubleRDD,我们使用DoubleFunction<T>而不是Function<T,Double>。这些特殊函数及用法见表3-5。
我们同样需要对RDD调用不同的函数(我们不能只是创建一个Double函数传递给map())。当我们想要一个DoubleRDD时,和下面的其他函数的模式一样,我们需要调用mapToDouble()而不是map()。
表格 3-5 特定类型函数的Java接口
函数名 | 等价函数*<A,B,...> | 用法 |
DoubleFlatMapFunction<T> | Function<T, Iterable<Double>> | 从flatMapToDouble()得到DoubleRDD |
DoubleFunction<T> | Function<T, double> | 从mapToDouble()得到DoubleRDD |
函数名 | 等价函数*<A,B,...> | 用法 |
PairFlatMapFunction<T, K, V> | Function<T, Iterable<Tuple2<K,V>>> | 从flatMapToPair()得到PairRDD<K,V> |
PairFunction<T, K, V> | Function<T, Tuple2<K,V>> | 从mapToPair()得到PairRDD<K,V> |
示例3-38 Java中创建DoubleRDD
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x * x;
}
});
System.out.println(result.mean());
Python
Python API的结构跟Java和Scala不同。在Python中,所有的函数都实现在基本RDD中,但是如果运行时RDD中的数据类型不正确会失败。
持久化(缓存)
之前说过,Spark RDD是延迟求值的,有时候我们会想多次使用同一个RDD。如果我们这么天真的做了,那么每次对这个RDD执行动作时,Spark都会重新计算这个RDD和所有依赖的RDD。这对于迭代计算时尤其昂贵,它会查找这些数据很多次。另一个浅显的例子是对同一个RDD先计数然后输出,如示例3-39。
示例3-39 Scala中两次执行
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))
为避免多次计算同一个RDD,我们可以要求Spark缓存该数据。当我们要求Spark缓存该RDD时,计算该RDD的节点都会保存它们的分区。如果缓存了该数据的节点出错了,Spark会在需要的时候重新计算丢失的分区。如果我们想在节点失败是处理不会变慢,那么我们可以复制数据到多个节点。
基于我们的目的,Spark有多个级别的持久策略可选择,见表3-6。在Scala(示例3-40)和Java中,默认的persist()是存储数据在JVM中作为非序列化对象。Python中我们总是序列化数据持久保存,所以默认是在JVM中保存为序列化对象。当我们输出数据到磁盘或者堆外存储时,数据总是序列化的。
表格 3-6 org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的存储级别;如果需要的话还可以通过添加_2到存储级别末尾来复制数据到2台机器
级别 | 空间占用 | CPU | 在内存 | 在磁盘 | 注释 |
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SE | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中 | 有时 | 有时 | 如果数据太多不能放在内存里,则溢出到磁盘 |
MEMORY_AND_DISK_SER | 低 | 高 | 有时 | 有时 | 如果数据太多不能放在内存里,则溢出到磁盘。内存中的数据表现为序列化。 |
DISK_ONLY | 低 | 高 | 否 | 是 | |
堆外缓存正在测试,用的是Tachyon。如果你对Spark的堆外缓存有兴趣,可以看看Running Spark On Tachyon guide。
示例3-40 Scala中使用persist()
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
注意,我们是在第一个动作之前对RDD调用的persist()。persist()对其自身调用不会导致求值。
如果你试图缓存太多的数据,一致超出了内存,Spark会使用LRU缓存策略丢弃旧的分区。对于memory-only存储级别,Spark会在需要访问数据时重新计算;而对于memory-and-disk级别,会将数据写到磁盘。无论哪种方式,你都不用担心是否缓存态度数据会使任务停止。然而,不必要的缓存数据会导致有用的数据被丢弃而进行过多的计算。
最后,RDD还提供了unpersist()函数给你手动释放缓存。
总结
在本章中,我们讲到了RDD的执行模型和大量常见的RDD操作。如果你都掌握了,恭喜——你已经学到了所有Spark的核心概念。在下一章中,我们会讲一组针对键值对的RDD的特殊操作,这在聚合或分组并行计算时很常用。之后会讨论各种数据源的输入和输出,以及关于SparkContext的更进一步的主题。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Spark弹性数据集
- Spark初探
- Spark Streaming初探
- 搭建hadoop/spark集群环境
- 整合Kafka到Spark Streaming——代码示例和挑战
- Spark 性能相关参数配置详解-任务调度篇
- 基于spark1.3.1的spark-sql实战-01
- 基于spark1.3.1的spark-sql实战-02
- 在 Databricks 可获得 Spark 1.5 预览版
- spark standalone模式 zeppelin安装
- Apache Spark 1.5.0正式发布
- Tachyon 0.7.1伪分布式集群安装与测试
- spark取得lzo压缩文件报错 java.lang.ClassNotFoundException
- tachyon与hdfs,以及spark整合