您的位置:首页 > 其它

Spark分组二次排序

2017-06-14 21:42 288 查看
在运用Spark数据处理中,有时要对数据进行分组(二次)排序。数据存储在HDFS中。实现代码如下:

package com.ibeifeng.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
import scala.util.{Random, Try}

object TopN {
def main(args: Array[String]): Unit = {
val hdfs = "hdfs://192.168.1.102:8020"
//设置配置属性
val conf = SparkConf()
.setMaster("dataNode1")
.setAppName("Secnodary-Sort")
.set("mapreduce.framework.name", "yarn")
.set("spark.rdd.compress", "true")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.storage.memoryFraction", "0.5")
.set("spark.akka.frameSize", "100")
.set("spark.default.parallelism", "1")
val sc = SparkContext.getOrCreate(conf)
//利用textFile方法创建RDD
val fileRDD: RDD[String] = sc.textFile(s"hdfs://${hdfs}/Data/emp.data")
val wordRDD: RDD[(String, Int)] = fileRDD.map(line => {
val arr = line.split(" ")

   //排除数据异常和空格
(Try(arr(0).trim),Try(1).trim.toInt)
})
.groupByKey()
.sortByKey(true)
.map(x => (x._1,x._2.sortWith(_ > _)))

  //结果数据输出到HDFS
wordRDD.saveAsTextFile(s"${hdfs}/interviewData/resultData")
groupByKey属于宽依赖RDD,会产生shuffle操作;Spark的shuffle过程,就是将各个节点上相同key的数据拉取到某个节点的一个

task进行处理的过程。如果rdd中某些key对于数据量特别大,就会造成数据倾斜,处理性能将收到影响。处理数据倾斜的方法很多,下次介绍解决方案之一:两阶段聚合
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息