Spark:sortBy和sortByKey的函数详解
2017-01-19 13:45
856 查看
在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是
sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark0.9.0之后才引入的(可以参见
sortBy函数是在
该函数最多可以传三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的
元素,也就变成了Key-Value类型的RDD了,它的实现如下:
那么,如何使用sortBy函数呢?
上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:
上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:
例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明:转载自过往记忆(http://www.iteblog.com/)
本文链接地址:《Spark:
sortBy和sortByKey函数详解》(http://www.iteblog.com/archives/1240)
sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark0.9.0之后才引入的(可以参见
SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。
一、sortBy函数实现以及使用
sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:
01 | /** |
02 | * |
03 | */ |
04 | def sortBy[K]( |
05 | f : (T) = > K, |
06 | ascending : Boolean = true , |
07 | numPartitions : Int = this .partitions.size) |
08 | ( implicit ord : Ordering[K], ctag : ClassTag[K]) : RDD[T] = |
09 | this .keyBy[K](f) |
10 | .sortByKey(ascending, numPartitions) |
11 | .values |
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为
this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的
元素,也就变成了Key-Value类型的RDD了,它的实现如下:
1 | /** |
2 | * |
3 | */ |
4 | def keyBy[K](f : T = > K) : RDD[(K, = { |
5 | map(x = > |
6 | } |
01 | /** |
02 | * |
03 | * |
04 | * |
05 | * |
06 | * |
07 | * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
08 | * 过往记忆博客微信公共帐号:iteblog_hadoop |
09 | */ |
10 | scala> val data = List( 3 , 1 , 90 , 3 , 5 , 12 ) |
11 | data : List[Int] = List( 3 , 1 , 90 , 3 , 5 , 12 ) |
12 |
13 | scala> val rdd = sc.parallelize(data) |
14 | rdd : org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[ 0 ] : 14 |
15 |
16 | scala> rdd.collect |
17 | res 0 : Array[Int] = Array( 3 , 1 , 90 , 3 , 5 , 12 ) |
18 |
19 | scala> = > x).collect |
20 | res 1 : Array[Int] = Array( 1 , 3 , 3 , 5 , 12 , 90 ) |
21 |
22 | scala> = > false ).collect |
23 | res 3 : Array[Int] = Array( 90 , 12 , 5 , 3 , 3 , 1 ) |
24 |
25 | scala> val result = rdd.sortBy(x = > false ) |
26 | result : org.apache.spark.rdd.RDD[Int] = MappedRDD[ 23 ] : 16 |
27 |
28 | scala> result.partitions.size |
29 | res 9 : Int = 2 |
30 |
31 | scala> val result = rdd.sortBy(x = > false , 1 ) |
32 | result : org.apache.spark.rdd.RDD[Int] = MappedRDD[ 26 ] : 16 |
33 |
34 | scala> result.partitions.size |
35 | res 10 : Int = 1 |
二、sortByKey函数实现以及使用
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下
1 | def sortByKey(ascending : Boolean = true , numPartitions : Int = self.partitions.size) |
2 | : RDD[(K, = |
3 | { |
4 | val part = new RangePartitioner(numPartitions, |
5 | new ShuffledRDD[K, |
6 | .setKeyOrdering( if (ascending) else ordering.reverse) |
7 | } |
01 | /** |
02 | * |
03 | * |
04 | * |
05 | * |
06 | * |
本文地址: |
07 | * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
08 | * 过往记忆博客微信公共帐号:iteblog_hadoop |
09 | */ |
10 | scala> val a = sc.parallelize(List( "wyp" , "iteblog" , "com" , "397090770" , "test" ), 2 ) |
11 | a : org.apache.spark.rdd.RDD[String] = |
12 | ParallelCollectionRDD[ 30 ] : 12 |
13 |
14 | scala> val b = sc. 1 to 2 ) |
15 | b : org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[ 31 ] : 14 |
16 |
17 | scala> val c = a.zip(b) |
18 | c : org.apache.spark.rdd.RDD[(String, = ZippedPartitionsRDD 2 [ 32 ] : 16 |
19 |
20 | scala> c.sortByKey().collect |
21 | res 11 : Array[(String, = Array(( 397090770 , 4 ), (com, 3 ), (iteblog, 2 ), (test, 5 ), (wyp, 1 )) |
private。他就是默认的排序规则,我们可以对它进行重写,如下:
valordering=implicitly[Ordering[K]]
01 | scala> val b = sc.parallelize(List( 3 , 1 , 9 , 12 , 4 )) |
02 | b : org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[ 38 ] : 12 |
03 |
04 | scala> val c = b.zip(a) |
05 | c : org.apache.spark.rdd.RDD[(Int, = ZippedPartitionsRDD 2 [ 39 ] : 16 |
06 |
07 | scala> c.sortByKey().collect |
08 | res 15 : Array[(Int, = Array(( 1 ,iteblog), ( 3 ,wyp), ( 4 ,test), ( 9 ,com), ( 12 , 397090770 )) |
09 |
10 | scala> implicit val sortIntegersByString = new Ordering[Int]{ |
11 | | override def compare(a : Int, b : Int) = |
12 | | a.toString.compare(b.toString)} |
13 | sortIntegersByString : Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$anon$ 1 @ 5 d 533 f 7 a |
14 |
15 | scala> c.sortByKey().collect |
16 | res 17 : Array[(Int, = Array(( 1 ,iteblog), ( 12 , 397090770 ), ( 3 ,wyp), ( 4 ,test), ( 9 ,com)) |
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明:转载自
本文链接地址:《Spark:
sortBy和sortByKey函数详解》(http://www.iteblog.com/archives/1240)
相关文章推荐
- Spark: sortBy和sortByKey函数详解
- Spark: sortBy和sortByKey函数详解
- Spark: sortBy和sortByKey函数详解
- Spark: sortBy和sortByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark RDD操作:combineByKey函数详解
- Spark核心RDD:foldByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark核心RDD:combineByKey函数详解
- Spark算子[13]:sortByKey、sortBy、二次排序 源码实例详解
- Spark算子详解之reduceByKey_sample_take_takeSample_distinct_sortByKey_saveAsTextFile_intersection
- SPARK:sortByKey和sortBy 函数讲解
- Spark核心RDD:combineByKey函数详解
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- spark中的sortBy和sortByKey
- Spark 使用sortByKey进行二次排序
- 【spark】sortByKey实现二次排序
- spark 的一些常用函数 filter,map,flatMap,lookup ,reduce,groupByKey