Spark Top N
2016-05-13 23:35
441 查看
基本TopN(Scala版)
topN分组(Java版):
RangePartitioner主要是依赖的RDD的数据划分成不同的范围,关键的地方是不同的范围是有序的
RangePartitioner除了是结果有序的基石以外,最为重要的是尽量保证每个Partition中的数据量是均匀的!!!
HashPartition会产生数据倾斜,极端情况下某(几)个分区拥有RDD的所有数据!!!
作业:使用Scala写TopN分组程序并且对Key排序
package com.dt.spark.cores.scala import org.apache.spark.{SparkContext, SparkConf} object TopNBasic { def main (args: Array[String]) { val conf = new SparkConf() conf.setAppName("TopNBasic") conf.setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("E:\\workspases\\data\\basicTopN.txt") val pairs = data.map(line => (line.toInt,line)) val sortedPairs = pairs.sortByKey(false) val sortedData = sortedPairs.map(pair => pair._2) val top5 = sortedData.take(5) sc.setLogLevel("OFF") top5.foreach(println) sc.stop() } }
topN分组(Java版):
package com.dt.spark.cores.java; import com.sun.corba.se.spi.legacy.connection.GetEndPointInfoAgainException; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class TopNGroup { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("TopNGroup").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD data = sc.textFile("E:\\workspases\\data\\GroupTopN.txt"); JavaPairRDD,Integer> pairs = data.mapToPair(new PairFunction, String, Integer>() { @Override public Tuple2, Integer> call(String line) throws Exception { String[] splitedline = line.split(" "); return new Tuple2, Integer>(splitedline[0], Integer.valueOf(splitedline[1])); } }); JavaPairRDD,Iterable> groupPairs = pairs.groupByKey(); JavaPairRDD,Iterable> top5 =groupPairs.mapToPair(new PairFunction, Iterable>, String, Iterable>() { @Override public Tuple2, Iterable> call(Tuple2, Iterable> groupedData) throws Exception { Integer[] top5 = new Integer[5]; String groupedKey = groupedData._1(); Iterator groupValue = groupedData._2().iterator(); while (groupValue.hasNext()) { Integer value = groupValue.next(); for (int i = 0; i < 5; i++) {//具体实现内部的topN if (top5[i] == null) { top5[i] = value; break; } else if (value > top5[i]) { for (int j = 4; j > i; j--) { top5[j] = top5[j - 1]; } top5[i] = value; break; } } } return new Tuple2, Iterable>(groupedKey, Arrays.asList(top5)); } }); sc.setLogLevel("OFF"); //打印分组内容 top5.foreach(new VoidFunction, Iterable>>() { @Override public void call(Tuple2, Iterable> topped) throws Exception { System.out.println("groupKey:" + topped._1());//获取groupKey Iterator toppedValue = topped._2().iterator();//获取groupvalue while (toppedValue.hasNext())//具体打印出每组Top N { Integer value = toppedValue.next(); System.out.println(value); } System.out.println("***********************************************"); } }); } }
RangePartitioner主要是依赖的RDD的数据划分成不同的范围,关键的地方是不同的范围是有序的
RangePartitioner除了是结果有序的基石以外,最为重要的是尽量保证每个Partition中的数据量是均匀的!!!
HashPartition会产生数据倾斜,极端情况下某(几)个分区拥有RDD的所有数据!!!
作业:使用Scala写TopN分组程序并且对Key排序
package com.dt.spark.cores.scala import org.apache.spark.{SparkContext, SparkConf} object TopNGroup { def main (args: Array[String]) { val conf = new SparkConf() conf.setAppName("TopNBasic") conf.setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("E:\\workspases\\data\\GroupTopN.txt") val groupRDD = data.map(line => (line.split(" ")(0),line.split(" ")(1).toInt)).groupByKey() val top5 = groupRDD.map(pair=> (pair._1,pair._2.toList.sortWith(_>_).take(5))).sortByKey() top5.collect().foreach(pair =>{ println(pair._1+":") pair._2.foreach(println) println("***********************") }) sc.stop() } }
相关文章推荐
- linux下不按回车如何读取字符&&读取到字符不回显
- linux 驱动函数简介
- CentOS 6使用iostat
- nginx学习笔记之二:nginx作为web server
- opencv_滑动条的使用
- Linux磁盘无密码共享给Winddows
- CentOS上使用sendmail发送邮件
- 给学Linux的同学们发几本电子书
- OpenGL 绘制一个三角型解析
- 更换本地的yum源为阿里云提供的镜像
- CentOS 7 安装 fcgiwrap时提示 FastCGI library is missing 的问题
- Windows Server 2008 R2 Acitve Directory域服务器安装与配置
- shell script 交互式脚本,可以读取命令行参数、选项,用户输入数据
- opecv2.4.9 samples编译问题解决
- linux修改成静态Ip 后无法联网
- CentOS 6.7安装MySQL
- Linux堆管理实现原理学习笔记 (上半部)
- 查看 linux系统硬件和环境信息
- 第二节windows系统下Xshell 5软件远程访问虚拟机 Linux系统
- SM2算法第七篇:Windows下Openssl安装与配置