RDD基础学习-[4]PairRDD聚合函数基础
2016-12-06 00:46
141 查看
简介
[1]aggregateByKey与aggregate相似,作用于PairRDD def aggregateByKey[U](zeroValue : U, partitioner : org.apache.spark.Partitioner)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]): org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U, numPartitions : scala.Int)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] [2]combineByKey,作用于PairRDD 与aggregateByKey相比 ,提供一个函数将每个分区中首个元素转换成类型C,再聚合 def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], partitioner : org.apache.spark.Partitioner, mapSideCombine : scala.Boolean = { /* compiled code */ }, serializer : org.apache.spark.serializer.Serializer = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } [3]reduceByKey每个分区中已(V,V)=>V,最终得到RDD(K,V) def reduceByKey(partitioner : org.apache.spark.Partitioner, func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } [4]groupByKey:(K,V)--->(K,Iterable[v1,v2,....]) def groupByKey() : org.apache.spark.rdd.RDD[scala.Tuple2[K, scala.Iterable[V]]] = { /* compiled code */ }
package com.dt.spark.main.RDDLearn.PairRDDAggrFunAPI import org.apache.spark.{SparkConf, SparkContext} /** * Created by on 16/7/17. */ //========================================== /* [1]aggregateByKey与aggregate相似,作用于PairRDD def aggregateByKey[U](zeroValue : U, partitioner : org.apache.spark.Partitioner)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]): org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U, numPartitions : scala.Int)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] [2]combineByKey,作用于PairRDD 与aggregateByKey相比 ,提供一个函数将每个分区中首个元素转换成类型C,再聚合 def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], partitioner : org.apache.spark.Partitioner, mapSideCombine : scala.Boolean = { /* compiled code */ }, serializer : org.apache.spark.serializer.Serializer = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } [3]reduceByKey每个分区中已(V,V)=>V,最终得到RDD(K,V) def reduceByKey(partitioner : org.apache.spark.Partitioner, func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } [4]groupByKey:(K,V)--->(K,Iterable[v1,v2,....]) def groupByKey() : org.apache.spark.rdd.RDD[scala.Tuple2[K, scala.Iterable[V]]] = { /* compiled code */ } */ object PairRDDAggrFunAPI { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("test") conf.setMaster("local") val sc = new SparkContext(conf) //========================================== /* 构建RDD [1]从scala数据集构建RDD: parallelize() */ val KVPairRDD = sc.parallelize(List(("中国", "山东"), ("中国", "北京"), ("中国", "上海"), ("美国", "洛杉矶"), ("美国", "德克萨斯"), ("韩国", "首尔"))) //========================================== /* aggregateByKey与aggregate相似,作用于PairRDD def aggregateByKey[U](zeroValue : U, partitioner : org.apache.spark.Partitioner)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]): org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U, numPartitions : scala.Int)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] def aggregateByKey[U](zeroValue : U)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] */ val res = KVPairRDD.aggregateByKey[List[String]](Nil)( (listInit, value) => value :: listInit, //每个分区上的根据相同的K聚合 (listA, listB) => listA ::: listB //分区间根据K二次聚合 ).collect() println("聚合结果: ") res.foreach(print(_)) // 聚合结果: // (美国,List(德克萨斯, 洛杉矶))(中国,List(上海, 北京, 山东))(韩国,List(首尔)) //========================================== /* combineByKey,作用于PairRDD 与aggregateByKey相比 ,提供一个函数将每个分区中首个元素转换成类型C,再聚合 def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], partitioner : org.apache.spark.Partitioner, mapSideCombine : scala.Boolean = { /* compiled code */ }, serializer : org.apache.spark.serializer.Serializer = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } def combineByKey[C](createCombiner : scala.Function1[V, C], mergeValue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { /* compiled code */ } */ val res2 = KVPairRDD.combineByKey[List[String]]( List(_:String), (list:List[String], v:String) => v :: list, (listA:List[String], listB:List[String]) => listA ::: listB ).collect() println("聚合结果: ") res2.foreach(print(_)) // 聚合结果: // (美国,List(德克萨斯, 洛杉矶))(中国,List(上海, 北京, 山东))(韩国,List(首尔)) //========================================== /* 与aggregateByKey相比 ,提供一个函数将每个分区中首个元素转换成类型C,再聚合,案例 求每个key的平均值 (K,V)---> (K,(和值,个数))-->求平均值 设计createCombiner函数将V-->(V,1) 元素将聚合(V+Vx,1+1)--->迭代下去 */ val KVPairRDD2 = sc.parallelize(List(("a", 1), ("a", 3), ("b", 3), ("b", 4))) val res3 = KVPairRDD2.combineByKey( (v) => (v, 1), (acc: (Int,Int), v) => (acc._1 + v, acc._2 + 1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._2,acc1._2+acc2._2) ).map{ case(key,value)=>(key,value._1/value._2.toDouble) } println("\n每个key的平均值: ") res3.collect().foreach(println) // (a,2.0) (a,(1+3,1+1))--->(a,(1+3/(1+1),1+1)) // (b,3.5) //========================================== /* reduceByKey每个分区中已(V,V)=>V,最终得到RDD(K,V) def reduceByKey(partitioner : org.apache.spark.Partitioner, func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V], numPartitions : scala.Int) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } def reduceByKey(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } */ //求每个K的个数 println("reduceByKey求每个K的和值: ") KVPairRDD2.reduceByKey(_+_).collect().foreach(println(_)) // (a,4) // (b,7) //def Function2(a:String,b:String):String=(a,b)match{ // case("",b)=>b // case(a,b)=>s"$a$b" // } val KVPairRDD3 = sc.parallelize(List((1, "I"), (1, "Love"), (2, "You"), (2, "!"))) KVPairRDD3.reduceByKey{ case("",b)=>b case(a,b)=>s"$a$b" }.collect().foreach(println(_)) // (1,ILove) // (2,You!) //========================================== /* groupByKey:(K,V)--->(K,Iterable[v1,v2,....]) def groupByKey() : org.apache.spark.rdd.RDD[scala.Tuple2[K, scala.Iterable[V]]] = { /* compiled code */ } */ //TopN,实现每个K值的top2 val KVPairRDD4 = sc.parallelize(List(("a", 1), ("a", 3), ("a", 4),("b", 3), ("b", 4),("b", 1), ("b", 4))) val groupRDD = KVPairRDD4.groupByKey().map( iter => { val key = iter._1 val valuesIterable= iter._2 val sortValues = valuesIterable.toList.sortWith(_>_).take(2) (key,sortValues) } ) //将格式转换回去 val flatgroupRDD = groupRDD.flatMap{ case (key,sortValues)=>sortValues.map(key->_) } flatgroupRDD.foreach(println) // (a,4) // (a,3) // (b,4) // (b,4) sc.stop() } }
相关文章推荐
- RDD基础学习-[3]RDD聚合函数基础
- Spark学习-RDD编程基础
- RDD基础学习-[1]RDD建立与WordCount
- Spark 基础学习第一讲:弹性分布式数据集RDD
- Spark MLlib 学习入门笔记 - RDD基础
- spark内核源码学习-RDD基础篇
- RDD基础学习-[5]PairRDD关联函数基础
- Spark MLlib 入门学习笔记 - RDD基础
- 有关RDD的基础学习1
- RDD基础学习-[2]RDD分区
- 【spark 深入学习 06】RDD编程之旅基础篇02-Spaek shell
- Spark2.x基础学习之RDD理解
- 学习C#,你的基础是什么?
- J2ME学习(四)——将MIDlet和界面分离(比较基础)
- SQL的详细语法介绍——对于学习数据库最基础知识[转]
- java基础学习感悟【转】
- 最近加入了控件开发团队,发现一些基础的东西,转发上来方便大家学习(转)
- 回复:学习java必须有c++基础么?
- java基础学习感悟【转】
- SQL的详细语法介绍——对于学习数据库最基础知识一