您的位置:首页 > 其它

Learning Spark笔记7-数据分组、连接、排序数据

2017-05-15 16:25 411 查看
数据分组

一个常见的使用方式是按键分组我们的数据-例如,查看所有的客户订单。

如果我们的数据已经有key了,那么groupByKey()会使用key来分组我们的数据。在一个RDD上key的类型是K,value的类型是V,我们得到一个RDD的类型是[K,Iterable[V]]。

groupBy()用在未成对的数据或者当前key除了相等之外的其他情况的数据。它提供一个函数应用到源RDD的每个元素上,用结果确定key。

如果你的代码使用groupByKey(),然后在value上使用reduce()或fold()。你可以通过使用每个key的聚合函数更有效地达到相同的结果。而不是降低RDD中内存的值,我们减少每个key的数据。例如,rdd.reduceByKey(func)与rdd.groupByKey().mapValues(value => value.reduce(func))生成相同的rdd,但是reduceBykey()有效地阻止了创建每个key的列表值的步骤。除了从一个单独的RDD中分组数据之外,我们还可以从多个RDDs中共享相同的key,使用cogroup()。cogroup共享的相同key是K,相对应的值是V,返回的RDD是W,[(K,(Iterable[V],Iterable[w]))]。如果其中一个RDD在另一个RDD中没有对应的key

,那么相应的Iterable将是空的。

cogroup()远远超过joins。我们使用cogroup()实现key的交叉,它也可以一次使用3个或更多RDDs。

连接

key/value数据与key/value数据在一起使用时会有一些很有用的功能。将数据连接起来对于key/value的RDD来说是最常用的操作,包括右外连接、左外连接,交叉连接和内连接。

最简单的连接操作是内连接。连个RDD中相同的key会输出。如果一个key有多个值,那么在结果RDD中每个key所有可能的value都会出现。

storeAddress = {

 (Store("Ritual"), "1026 Valencia St"), 

 (Store("Philz"), "748 Van Ness Ave"),

 (Store("Philz"), "3101 24th St"), 

 (Store("Starbucks"), "Seattle")

 }

storeRating = {

 (Store("Ritual"), 4.9), 

 (Store("Philz"), 4.8))

 }

storeAddress.join(storeRating) == {

 (Store("Ritual"), ("1026 Valencia St", 4.9)),

 (Store("Philz"), ("748 Van Ness Ave", 4.8)),

 (Store("Philz"), ("3101 24th St", 4.8))}

Example 4-18. leftOuterJoin() and rightOuterJoin()

storeAddress.leftOuterJoin(storeRating) ==

{(Store("Ritual"),("1026 Valencia St",Some(4.9))),

 (Store("Starbucks"),("Seattle",None)),

 (Store("Philz"),("748 Van Ness Ave",Some(4.8))),

 (Store("Philz"),("3101 24th St",Some(4.8)))}

 

storeAddress.rightOuterJoin(storeRating) ==

{(Store("Ritual"),(Some("1026 Valencia St"),4.9)),

 (Store("Philz"),(Some("748 Van Ness Ave"),4.8)),

 (Store("Philz"), (Some("3101 24th St"),4.8))}

排序数据

使用排序后的数据在很多情况下都非常有用,尤其是当你生成输出数据时。我们可以使用键值对RDD进行排序,只要键上定义了一个顺序就可以。一旦我们的数据是有序的,那么随后在任何情况下使用collect()或save()得到的结果就都是有序的。通常我们想要倒叙,sortByKey()函数的参数默认是true。

下面是自定义比较器:

Example 4-20. Custom sort order in Scala, sorting integers as if strings

val input: RDD[(Int, Venue)] = ...

implicit val sortIntegersByString = new Ordering[Int] {

 override def compare(a: Int, b: Int) = a.toString.compare(b.toString)

}

rdd.sortByKey()

Example 4-21. Custom sort order in Java, sorting integers as if strings

class IntegerComparator implements Comparator<Integer> {

 public int compare(Integer a, Integer b) {

 return String.valueOf(a).compareTo(String.valueOf(b))

 }

}

rdd.sortByKey(comp)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  数据分组 join 排序
相关文章推荐