Spark分组二次排序
2017-06-14 21:42
288 查看
在运用Spark数据处理中,有时要对数据进行分组(二次)排序。数据存储在HDFS中。实现代码如下:
task进行处理的过程。如果rdd中某些key对于数据量特别大,就会造成数据倾斜,处理性能将收到影响。处理数据倾斜的方法很多,下次介绍解决方案之一:两阶段聚合
package com.ibeifeng.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer import scala.util.{Random, Try} object TopN { def main(args: Array[String]): Unit = { val hdfs = "hdfs://192.168.1.102:8020" //设置配置属性 val conf = SparkConf() .setMaster("dataNode1") .setAppName("Secnodary-Sort") .set("mapreduce.framework.name", "yarn") .set("spark.rdd.compress", "true") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("spark.storage.memoryFraction", "0.5") .set("spark.akka.frameSize", "100") .set("spark.default.parallelism", "1") val sc = SparkContext.getOrCreate(conf) //利用textFile方法创建RDD val fileRDD: RDD[String] = sc.textFile(s"hdfs://${hdfs}/Data/emp.data") val wordRDD: RDD[(String, Int)] = fileRDD.map(line => { val arr = line.split(" ") //排除数据异常和空格 (Try(arr(0).trim),Try(1).trim.toInt) }) .groupByKey() .sortByKey(true) .map(x => (x._1,x._2.sortWith(_ > _))) //结果数据输出到HDFS wordRDD.saveAsTextFile(s"${hdfs}/interviewData/resultData")groupByKey属于宽依赖RDD,会产生shuffle操作;Spark的shuffle过程,就是将各个节点上相同key的数据拉取到某个节点的一个
task进行处理的过程。如果rdd中某些key对于数据量特别大,就会造成数据倾斜,处理性能将收到影响。处理数据倾斜的方法很多,下次介绍解决方案之一:两阶段聚合
相关文章推荐
- spark分组统计及二次排序案例一枚
- Spark RDD 二次分组排序取TopK
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- Spark 二次排序
- spark二次排序简单例子(JAVA)
- spark对分组后value值进行排序(JAVA)
- 大数据IMF传奇 第19课 spark 二次排序 使用JAVA自定义key 进行二次排序
- Spark的二次排序解决方案
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- Spark基础排序+二次排序(java+scala)
- spark二次排序简单例子(JAVA)
- Spark SQL函数之分组排序
- spark对分组后value值进行排序(JAVA)
- Spark Scala 二次排序
- spark简单二次排序
- 分别使用Hadoop和Spark实现二次排序
- spark二次排序简单例子(JAVA)
- spark对分组后value值进行排序(JAVA)
- Spark——二次排序(scala)