您的位置:首页 > 其它

Spark:sortBy和sortByKey的函数详解

2017-01-19 13:45 856 查看
在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是
sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark0.9.0之后才引入的(可以参见
SPARK-1063
)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

  


一、sortBy函数实现以及使用

  sortBy函数是在
org.apache.spark.rdd.RDD
类中实现的,它的实现如下:

01
/**
02
*
ReturnthisRDDsortedbythegivenkeyfunction.
03
*/
04
def
sortBy[K](
05
f
:
(T)
=
>
K,
06
ascending
:
Boolean
=
true
,
07
numPartitions
:
Int
=
this
.partitions.size)
08
(
implicit
ord
:
Ordering[K],
ctag
:
ClassTag[K])
:
RDD[T]
=
09
this
.keyBy[K](f)
10
.sortByKey(ascending,
numPartitions)
11
.values
  该函数最多可以传三个参数:

  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;

  第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;

  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为
this.partitions.size


  从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的

元素,也就变成了Key-Value类型的RDD了,它的实现如下:

1
/**
2
*
CreatestuplesoftheelementsinthisRDDbyapplying`f`.
3
*/
4
def
keyBy[K](f
:
T
=
>
K)
:
RDD[(K,
T)]
=
{
5
map(x
=
>
(f(x),x))
6
}
  那么,如何使用sortBy函数呢?

01
/**
02
*
User:过往记忆
03
*
Date:14-12-26
04
*
Time:上午10:16
05
*
bolg:http://www.iteblog.com
06
*
本文地址:http://www.iteblog.com/archives/1240
07
*
过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08
*
过往记忆博客微信公共帐号:iteblog_hadoop
09
*/
10
scala>
val
data
=
List(
3
,
1
,
90
,
3
,
5
,
12
)
11
data
:
List[Int]
=
List(
3
,
1
,
90
,
3
,
5
,
12
)
12
13
scala>
val
rdd
=
sc.parallelize(data)
14
rdd
:
org.apache.spark.rdd.RDD[Int]
=
ParallelCollectionRDD[
0
]
atparallelizeat<console>
:
14
15
16
scala>
rdd.collect
17
res
0
:
Array[Int]
=
Array(
3
,
1
,
90
,
3
,
5
,
12
)
18
19
scala>
rdd.sortBy(x
=
>
x).collect
20
res
1
:
Array[Int]
=
Array(
1
,
3
,
3
,
5
,
12
,
90
)
21
22
scala>
rdd.sortBy(x
=
>
x,
false
).collect
23
res
3
:
Array[Int]
=
Array(
90
,
12
,
5
,
3
,
3
,
1
)
24
25
scala>
val
result
=
rdd.sortBy(x
=
>
x,
false
)
26
result
:
org.apache.spark.rdd.RDD[Int]
=
MappedRDD[
23
]
atsortByat<console>
:
16
27
28
scala>
result.partitions.size
29
res
9
:
Int
=
2
30
31
scala>
val
result
=
rdd.sortBy(x
=
>
x,
false
,
1
)
32
result
:
org.apache.spark.rdd.RDD[Int]
=
MappedRDD[
26
]
atsortByat<console>
:
16
33
34
scala>
result.partitions.size
35
res
10
:
Int
=
1
  上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。

  


二、sortByKey函数实现以及使用

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在
org.apache.spark.rdd.OrderedRDDFunctions
中实现的,实现如下

1
def
sortByKey(ascending
:
Boolean
=
true
,
numPartitions
:
Int
=
self.partitions.size)
2
:
RDD[(K,
V)]
=
3
{
4
val
part
=
new
RangePartitioner(numPartitions,
self,ascending)
5
new
ShuffledRDD[K,
V,V](self,part)
6
.setKeyOrdering(
if
(ascending)
ordering
else
ordering.reverse)
7
}
  从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

01
/**
02
*
User:过往记忆
03
*
Date:14-12-26
04
*
Time:上午10:16
05
*
bolg:http://www.iteblog.com
06
*
本文地址:http://www.iteblog.com/archives/1240
07
*
过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08
*
过往记忆博客微信公共帐号:iteblog_hadoop
09
*/
10
scala>
val
a
=
sc.parallelize(List(
"wyp"
,
"iteblog"
,
"com"
,
"397090770"
,
"test"
),
2
)
11
a
:
org.apache.spark.rdd.RDD[String]
=
12
ParallelCollectionRDD[
30
]
atparallelizeat<console>
:
12
13
14
scala>
val
b
=
sc.
parallelize(
1
to
a.count.toInt,
2
)
15
b
:
org.apache.spark.rdd.RDD[Int]
=
ParallelCollectionRDD[
31
]
atparallelizeat<console>
:
14
16
17
scala>
val
c
=
a.zip(b)
18
c
:
org.apache.spark.rdd.RDD[(String,
Int)]
=
ZippedPartitionsRDD
2
[
32
]
atzipat<console>
:
16
19
20
scala>
c.sortByKey().collect
21
res
11
:
Array[(String,
Int)]
=
Array((
397090770
,
4
),
(com,
3
),
(iteblog,
2
),
(test,
5
),
(wyp,
1
))
  上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:
private
valordering=implicitly[Ordering[K]]
。他就是默认的排序规则,我们可以对它进行重写,如下:

01
scala>
val
b
=
sc.parallelize(List(
3
,
1
,
9
,
12
,
4
))
02
b
:
org.apache.spark.rdd.RDD[Int]
=
ParallelCollectionRDD[
38
]
atparallelizeat<console>
:
12
03
04
scala>
val
c
=
b.zip(a)
05
c
:
org.apache.spark.rdd.RDD[(Int,
String)]
=
ZippedPartitionsRDD
2
[
39
]
atzipat<console>
:
16
06
07
scala>
c.sortByKey().collect
08
res
15
:
Array[(Int,
String)]
=
Array((
1
,iteblog),
(
3
,wyp),
(
4
,test),
(
9
,com),
(
12
,
397090770
))
09
10
scala>
implicit
val
sortIntegersByString
=
new
Ordering[Int]{
11
|
override
def
compare(a
:
Int,
b
:
Int)
=
12
|
a.toString.compare(b.toString)}
13
sortIntegersByString
:
Ordering[Int]
=
$iwC$$iwC$$iwC$$iwC$$iwC$$anon$
1
@
5
d
533
f
7
a
14
15
scala>
c.sortByKey().collect
16
res
17
:
Array[(Int,
String)]
=
Array((
1
,iteblog),
(
12
,
397090770
),
(
3
,wyp),
(
4
,test),
(
9
,com))
  例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。
本博客文章除特别声明,全部都是原创!

尊重原创,转载请注明:转载自过往记忆(http://www.iteblog.com/)

本文链接地址:《Spark:
sortBy和sortByKey函数详解》(http://www.iteblog.com/archives/1240)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: