spark算子:partitionBy对数据进行分区
2017-11-07 20:00
239 查看
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
参考:http://lxw1234.com/archives/2015/07/356.htm
该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21 scala> rdd1.partitions.size res20: Int = 2 //查看rdd1中每个分区的元素 scala> rdd1.mapPartitionsWithIndex{ | (partIdx,iter) => { | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() | while(iter.hasNext){ | var part_name = "part_" + partIdx; | var elem = iter.next() | if(part_map.contains(part_name)) { | var elems = part_map(part_name) | elems ::= elem | part_map(part_name) = elems | } else { | part_map(part_name) = List[(Int,String)]{elem} | } | } | part_map.iterator | | } | }.collect res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C)))) //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中 //使用partitionBy重分区 scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23 scala> rdd2.partitions.size res23: Int = 2 //查看rdd2中每个分区的元素 scala> rdd2.mapPartitionsWithIndex{ | (partIdx,iter) => { | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() | while(iter.hasNext){ | var part_name = "part_" + partIdx; | var elem = iter.next() | if(part_map.contains(part_name)) { | var elems = part_map(part_name) | elems ::= elem | part_map(part_name) = elems | } else { | part_map(part_name) = List[(Int,String)]{elem} | } | } | part_map.iterator | } | }.collect res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A)))) //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中
参考:http://lxw1234.com/archives/2015/07/356.htm
相关文章推荐
- Spark Structured Streaming:将数据落地按照数据字段进行分区方案
- Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
- 《Spark机器学习》笔记——基于MovieLens数据集使用Spark进行电影数据分析
- 大数据IMF传奇 第19课 spark 二次排序 使用JAVA自定义key 进行二次排序
- 一起学Spark(13) -- 数据 Shuffle 与分区器
- 延云YDB基于spark进行数据分析的一种新方式
- 用Apache Spark进行大数据处理 - 第六部分: 用Spark GraphX进行图数据分析
- Spark算子--partitionBy
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- 一起学Spark(13) -- 数据 Shuffle 与分区器
- 延云YDB基于spark进行数据分析的一种新方式
- 大数据算子(spark)
- 硬盘分区后能进行数据恢复
- Spark基础:使用维基百科数据集来用Spark进行原型实验
- 从零开始,手把手教会你5分钟用SPARK对PM2.5数据进行分析(包括环境准备和SPARK代码)
- 大数据:Spark Shuffle(一)ShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去
- 一起学Spark(13) -- 数据 Shuffle 与分区器
- 大数据日志分析系统-spark进行日志计算
- 延云YDB基于spark进行数据分析的一种新方式
- SQLServer 2005 海量数据解决方案(分区表)与对已存在的表进行分区