您的位置:首页 > 编程语言

Learning Spark - LIGHTNING-FAST DATA ANALYSIS 第四章 - (1)

2015-09-28 20:54 381 查看
续第三章:Learning Spark 第三章 RDD编程 已翻译整理完毕,PDF可下载

PS:今天去换药,一上午就没了,坑爹啊~~ 加油加油,第四章!!

第四章

处理键值对(Key/Value Pairs)

 

 

 

本章介绍如何处理键值对,这是Spark中常见的一种数据类型。键值对RDD通常用于聚合操作,也经常会将一些初始ETL(提取,转换,加载)获取的数据保存为键值对的格式。键值对的RDD也暴露了一些新的操作(比如每个产品的评价计数,按相同的键对数据进行分组,对两个不同的RDD分组)。

 

我们也会讨论一个键值对RDD的高级特征:分区(partitioning),使用户可以跨节点控制RDD的布局。通过可控的分区,应用程序有时可确保数据在同一个机器上,可以集中访问,就可以大量的减少通信的开销,以此获得显著的提速。我们会用一个PageRand算法的例子来阐述分区。选择正确的分区对于分布式系统来说就和本地程序选择正确的数据结构类似,这两者都说明数据的布局对性能的影响非常大。

 

动机

Spark为包含键值对的RDD提供了一些特殊的操作。这种RDD被称之为pair RDD。Pair RDD在许多程序中都是很有用的组件,因为它们对外的操作可以让你并行的处理每个键,或者跨网络重组数据。例如,Pair RDD有一个reduceByKey()的方法,它可以对每个键的数据分别进行聚合;join()方法可以通过对两个RDD中相同的元素进行分组合并。从RDD中抽取字段(例如事件的事件,客户ID或者其他标识)并用这些字段作为pair RDD的键进行处理是很常见的。

 

 

 

创建Pair RDD

Spark中有多种方式能得到pair RDD。在第五章中我们要探索的很多格式加载时都可以直接的返回其键值数据为pair RDD。另外,我们有一个普通RDD想要转换为pair RDD,可以通过map()操作来返回键值对。通过代码来看个例子,从一个包含文本行的RDD开始,用每一行的第一个单词作为key。

 

这种方式构造键值RDD会根据编程语言有些不同。在Python中,为了处理有key的数据,我们需要返回tuple组成的RDD(见示例4-1)。

 

示例4-1 Python中使用第一个单词做key来创建pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0], x))

 

在Scala中,为了处理有key的数据,我们同样需要返回tuple(见示例4-2)。tuple类型的RDD存在隐式转换,可以提供附加的键值函数。

 

示例4-2 Scala中使用第一个单词做key来创建pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

Java没有内置的tuple类型,所以Spark的Java API有一个用户创建的scala.Tuple2类。该类很简单:Java用户可以编写new Tuple2(elem1, elem2)来创建一个新tuple,然后用._1()和._2()方法来访问tuple中的元素。

 

Java用户在创建pair RDD时同样需要调用特殊版本的Spark函数。比如用mapToPair()替换基本函数map(),在43页的“JAVA”部分有更多讨论。不过可以看一个简单的示例4-3。

 

示例4-3 Java中使用第一个单词做key来创建pair RDD
PairFunction<String, String, String> keyData =
    new PairFunction<String, String, String>() {
        public Tuple2<String, String> call(String x) {
        return new Tuple2(x.split(" ")[0], x);
    }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

在Scala和Python中,当从内存中的的集合创建pair RDD,我们只需要对集合调用SparkContext.parallelize()即可。而在Java中,要从内存中的集合创建pair RDD,需要调用SparkContext.parallelizePairs()函数。

 

对Pair RDD的变换

Pair RDD允许使用标准RDD的所有转换。30页中“传入函数到Spark”的规则同样适用。由于pair RDD包含tuple,我们需要传入操作tuple而不是单个元素的函数。表4-1和4-2汇总了对pair RDD的变换,在后面的章节中会深入这些变换的细节。

 

表格 4-1 对单个pair RDD的变换(例子:{(1,2), (3,4), (3,6)})

函数名

目的

示例

结果

reduceByKey(func)

按相同的键合并

rdd.reduceByKey(

(x, y) => x + y )

{(1,2), (3,10)}

groupByKey()

按相同的键分组

rdd.groupByKey()

{(1,[2]),(3,[4,6])}

combineByKey(

createCombiner, mergeValue, mergeCombiners, partitioner)

按相同的键合并,返回不同的结果类型

见示例4-12到4-14

 

mapValues(func)

应用函数到pair RDD的每个值,但是不改变键

rdd.mapValues(

x => x+1)

{(1,3), (3, 5), (3, 7)}

flatMapValues(func)

应用一个返回pair RDD中每个值的迭代器的函数,并对每个返回的元素以原来的键生成键值对,通常用于分词

rdd.flatMapValues(

x => (x to 5) )

{(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)}

keys()

只返回RDD中的所有键

rdd.keys()

{1, 3, 3}

 

函数名

目的

示例

结果

values()

只返回RDD中的所有值

rdd.values()

{2, 4, 6}

sortByKey()

返回按键排序的RDD

rdd.sortByKey()

{(1, 2), (3, 4), (3, 6)}

 

表格 4-2 对两个pair RDD的变换(rdd={(1,2), (3,4), (3,6)},other={(3,9)})

函数名

目的

示例

结果

subtractByKey

去除另一个RDD中存在键的元素

rdd.subtractByKey(other)

{(1, 2)}

join

两个RDD执行内连接

rdd.join(other)

{(3, (4, 9)), (3, (6, 9)}

rightOutJoin

两个RDD执行连接操作,但是other RDD中的key必须存在

rdd.rightOutJoin(other)

{(3, (Some(4), 9)), (3, (Some(6), 9))}

leftOutJoin

两个RDD执行连接操作,但是第一个RDD中的key必须存在

rdd.leftOutJoin(other)

{(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

cogroup

对两个RDD的数据共享相同的键分组

rdd.cogroup(other)

{(1, ([2], [])),  (3,

([4, 6], [9]))}

 

我们将在接下来的章节讨论每个pair RDD函数簇的更多细节。

 

Pair RDD也仍然是RDD(在Java/Scala中是Tuple2类型的RDD,在Python中是Tuple类型的RDD),所以也支持RDD的相同功能。例如,我们可以用前一节中的pair RDD来过滤掉大于20字节的行。见示例4-4到4-6和图4-1。

 

示例4-4 Python中对第二个元素简单过滤
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)

 

示例4-5 Scala中对第二个元素简单过滤
pairs.filter{case (key, value) => value.length < 20}

示例4-6 Java中对第二个元素简单过滤
Function<Tuple2<String, String>, Boolean> longWordFilter =
    new Function<Tuple2<String, String>, Boolean>() {
        public Boolean call(Tuple2<String, String> keyValue) {
        return (keyValue._2().length() < 20);
    }
};
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);

 



图 4-1 按值过滤

 

有时我们只是想访问pair RDD的值那部分,处理pair就会有些不灵活。由于这是很常见的模式,所以Spark提供了mapValues(func)函数,等同于map{case (x, y) => (x, func(y))}。在我们的示例中会大量使用该函数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息