您的位置:首页 > 其它

spark算子:partitionBy对数据进行分区

2017-11-07 20:00 239 查看
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21

scala> rdd1.partitions.size
res20: Int = 2

//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
|         (partIdx,iter) => {
|           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
|             while(iter.hasNext){
|               var part_name = "part_" + partIdx;
|               var elem = iter.next()
|               if(part_map.contains(part_name)) {
|                 var elems = part_map(part_name)
|                 elems ::= elem
|                 part_map(part_name) = elems
|               } else {
|                 part_map(part_name) = List[(Int,String)]{elem}
|               }
|             }
|             part_map.iterator
|
|         }
|       }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中

//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23

scala> rdd2.partitions.size
res23: Int = 2

//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{
|         (partIdx,iter) => {
|           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
|             while(iter.hasNext){
|               var part_name = "part_" + partIdx;
|               var elem = iter.next()
|               if(part_map.contains(part_name)) {
|                 var elems = part_map(part_name)
|                 elems ::= elem
|                 part_map(part_name) = elems
|               } else {
|                 part_map(part_name) = List[(Int,String)]{elem}
|               }
|             }
|             part_map.iterator
|         }
|       }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中


参考:http://lxw1234.com/archives/2015/07/356.htm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: