您的位置:首页 > 其它

Learning Spark - LIGHTNING-FAST DATA ANALYSIS 第三章 - (1)

2015-09-24 20:06 351 查看

更新,第三章完整版PDF可下载:Learning Spark 第三章 RDD编程 已翻译整理完毕,PDF可下载

最近抱恙,在医院割了一刀,静养中,闲着也是闲着,一边看书一边翻译,word写的copy过来,格式稍微有点乱了,回头整章再出个word的排版好的版本吧。

第三章 RDD编程

  

本章介绍Spark处理数据的核心抽象:弹性分布式数据集(RDD)。RDD简单来说就是元素的分布式集合。在Spark中,所有的工作都被表达为创建新RDD,对已存在的RDD做变换,或者对RDD调用某些操作来计算得到一个结果。在底层,Spark将包含在RDD中的数据自动分布到你的整个集群,并将你对其执行的操作并行化。

 

数据科学家和工程师都应该阅读本章,因为RDD是Spark的核心概念。我们强烈建议你在交互式Shell中尝试一些示例(见本书第11页的“Spark的Python和Scala Shell简介”)。另外,本章所有的代码都在本书的github库有下载。

 

RDD基础

Spark中的RDD,简单来说就是所有对象的一个不可变的分布式集合。每个RDD都被分割为多个分区,这就可以在集群的不同节点上进行计算。RDD可以包含任何Python,Java,Scala对象类型,包括用户自定义类型。

 

用户可以用两种方式创建RDD:通过加载一个外部数据集,或者在驱动程序中分发一个对象集合(如list或set)。如同示例3-1展示的,我们知道了使用SparkContext.textFile()函数加载一个文本文件作为一个字符串RDD。

 

示例3-1:在Python中用textFile()函数创建一个字符串RDD

>>> lines = sc.textFile(“README.md”)

 

RDD一旦创建好了,可以提供两种不同类型的操作:变换(transformation)和动作(action)。变换是从前一个RDD构造出一个新的RDD。例如,有一个常见的变换是用谓词匹配来过滤数据。在我们之前的文本文件的示例中,我们可以用这个变换来创建一个新的RDD,这个RDD容纳的数据是只包含了单词“Python”的字符串。如示例3-2所示:

 

示例3-2:调用filter()变换

>>> pythonlines = lines.filter(lamda line: “Python” in line)

 

动作,另一方面来看,是基于RDD来计算某个结果,并将结果返回给驱动程序或者保存结果到一个外部的存储系统(比如HDFS)。更早之前我们调用过一个动作的例子是first()。它返回RDD中的第一个元素,示例3-3展示了这点:

 

示例3-3:调用first()动作

>>> pythonlines.first()

u'## Interactive Python Shell'

 

变换和动作不相同是因为Spark计算RDD的方式。虽然,在任何时候你都可以定义一个新的RDD,但是Spark总是以一种lazy的方式计算它们,也就是它们被第一次用于动作的时候。这种方式最初可能觉得不太寻常,但是当你开始处理大数据时,你就会有同感了。举例来说,考虑下前面的示例3-2和示例3-3,我们定义了一个文本文件RDD然后过滤出了包含“Python”的行。如果当我们一写完lines = sc.textFile(...)语句,Spark就立刻加载和保存整个文件的所有行的话,考虑到我们马上就要过虑掉很多的行,这会导致浪费很多存储空间。反过来说,一旦Spark知道了整个变换链,它就能只计算结果需要的数据。实际上,对于first()动作来说,Spark只需要扫描文件直到它找到第一个符合条件的行就可以了,这甚至不需要读整个文件。

 

最后,每次你执行个动作,Spark的RDD默认会被重新计算。如果你想在多个动作中重用RDD,你可以用RDD.persist()要求Spark对RDD持久化。我们可以用一些不同的方式要求Spark对我们的数据持久化,详见表3-6。在初次计算之后,Spark可以保存RDD的内容到内存中(在你的集群中跨机器分区),并在未来的动作中重用。持久化RDD到磁盘上,而不是内存中,也是可能的。默认不持久化的行为看起来也有点奇怪,但是对大数据集来说就该这样:如果你不会重用这个RDD,那就没有理由浪费存储空间。相反的,一旦Spark流过数据,只是计算结果就好了。1

实际上,你会经常使用persist()来加载你的数据子集到内存并反复查询。比如,如果我们知道我们想要计算关于README文件中包含“Python”的行的多个结果,我们会写示例3-4那样的脚本。

示例3-4:持久化RDD到内存

>>> pythonLines.persist

>>> pythonLines.count()

2

>>> pythonLines.first()

u'## Interactive Python Shell'

 

总之,每个Spark程序或者shell会话都是像这样工作:

1. 从外部数据创建一些作为输入的RDD

2. 使用类似filter()之类的变换来定义出新的RDD

3. 要求Spark对需要重用的任何中间RDD进行persist()

4. 启动类似于count()和first()这样的动作开始并行计算,然后Spark会优化并执行。

 

 

cache()和在默认存储级别上和调用persist()的效果一样。

 

 

在本章接下来的部分,我们将从头到尾的详细讨论每个步骤,包括Spark中一些最常用的操作。

创建RDD

Spark提供两种方式创建RDD:通过加载一个外部数据集,或者在你的驱动程序中并行化一个集合。

 

1总是重新计算一个RDD的能力事实上就是为什么RDD被称为“弹性”的原因。当拥有RDD数据的机器发生故障,Spark就利用这个能力重新计算丢失的分区,这对用户来说是透明的。

最简单的创建RDD的方式就是将你程序中已存在的集合传递给SparkContext的parallelize()方法,见示例3-5到3-7。当你在学习Spark的时候,这种方法非常有用。你可以在Shell中快速创建你自己的RDD并对其进行操作。然而,请记住在原型和测试之外,这种方式并不常用。因为这要求你所有的数据都在一台机器上的内存中。

示例3-5:Python的parallelize()方法

lines = sc.parallelize([“pandas”, “i like pandas”])

示例3-6:Scala的parallelize()方法

val lines = sc.parallelize(List(“pandas”, “i like pandas”))

示例3-7:Java的parallelize()方法

JavaRDD<String> lines = sc.parallelize(Arrays.asList(“pandas”, “i like pandas”))

 

更常见的方式是从外部存储加载数据,详见第5章。然而我们之前已经见过加载文本文件为字符串RDD的方法:SparkContext.textFile(),见示例3-8到3-10。

示例3-8:Python的textFile()方法

lines = sc.textFile(“/path/to/README.md”)

示例3-9:Scala的textFile()方法

val lines = sc.textFile(“/path/to/README.md”)

示例3-10:Java的textFile()方法

JavaRDD<String> lines = sc.textFile(“/path/to/README.md”);

RDD操作

我们已经提到过,RDD支持两种类型的操作:变换和动作。变换是对一个RDD进行操作得到一个新的RDD,如map()和filter()。动作是返回一个结果到驱动程序或者写入到存储并开始计算的操作,如count()和first()。Spark对待变换和动作很不一样,所以理解你要执行的是何种操作十分重要。如果你对一个给定的函数是变换还是动作还有些混淆,那就看它的返回类型。返回RDD的就是变换,反之,动作是返回其他类型。

 
变换

变换是对一个RDD进行操作得到一个新的RDD。如29页的“延迟计算”中讨论的一样,RDD的变换计算是会延迟的,直到你在一个动作中用到。大多数变换都是元素级的,也就是说,每次处理一个元素,但不是所有变换都这样。

看个例子。假设我们有个日志文件log.txt,里面有些日志消息。我们想仅选择出错误消息。可以用之前见过的filter()变换。这次我们用Spark的所有三种语言API来展示filter的用法(见示例3-11到3-13)。

 

示例3-11:Python的filter()方法

inputRDD = sc.textFile(“log.txt”)
errsRDD = inputRDD.filter(lambda x: “error” in x)
示例3-12:Scala的filter()方法
val inputRDD = sc.textFile(“log.txt”)
va l errsRDD = inputRDD.filter(line => line.contains(“error”))
示例3-13:Java的filter()方法

JavaRDD<String> lines = sc.textFile(“log.txt”);
JavaRDD<String> errsRDD = inputRDD.filter({
  New Function<String, Boolean>(){
Public Boolean call(string x) { return x.contains(“error”); }
  }
});
 
注意到filter操作并没改变已存在的RDD,相反,它返回了整个新RDD的指针。InputRDD仍然可以在后面的程序中重用-比如要查找别的词。实际上,让我们再来用这个RDD找包含词语”warning”的行。然后我们用另一个变换union来打印这些包含了”error”或者”warning”的行。在示例3-14中我们是用Python展示,但是union在所有的三种语言中都支持。

 

 

示例3-14:Python中的union()变换

errorsRDD = inputRDD.filter(lamda x: “error” in x)
warningsRDD = inputRDD.filter(lamda x: “warning” in x)
badLinesRDD = errorsRDD.union(warningsRDD)
union和filter变换有一点点不同,它是对两个RDD操作,而不是一个。实际上变换可以对任意个输入RDD进行操作。

 

达到示例3-14同样结果的更好的方式是只过滤inputRDD一次,查找”error”或者”warning”。

 

最终,从RR的各个变换形成了新的RDD,Spark跟踪了这些不同RDD之间的依赖关系的集合,称之为血统图(lineage graph)。Spark用这个信息来根据需要计算每个RDD,并恢复丢失的数据,如果持久化的RDD的一部分丢失了的话。图3-1展示了示例3-14的血统图。

 



图 3-1 日志分析中创建的RDD血统图
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Learning Spark Spark RDD