spark算子:combineByKey
2017-11-08 21:23
316 查看
假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数。
输出步骤:
上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:
scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy")) people: List[(String, String)] = List((male,Mobin), (male,Kpop), (female,Lucy), (male,Lufei), (female,Amy)) scala> val rdd = sc.parallelize(people) rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23 scala> val combinByKeyRDD = rdd.combineByKey( | (x: String) => (List(x), 1), | (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), | (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[String], Int))] = ShuffledRDD[1] at combineByKey at <console>:25 scala> combinByKeyRDD.foreach(println) (female,(List(Lucy, Amy),2)) (male,(List(Mobin, Kpop, Lufei),3)) scala>
输出步骤:
Partition1: K="male" --> ("male","Mobin") --> createCombiner("Mobin") => peo1 = ( List("Mobin") , 1 ) K="male" --> ("male","Kpop") --> mergeValue(peo1,"Kpop") => peo2 = ( "Kpop" :: peo1_1 , 1 + 1 ) //Key相同调用mergeValue函数对值进行合并 K="female" --> ("female","Lucy") --> createCombiner("Lucy") => peo3 = ( List("Lucy") , 1 ) Partition2: K="male" --> ("male","Lufei") --> createCombiner("Lufei") => peo4 = ( List("Lufei") , 1 ) K="female" --> ("female","Amy") --> createCombiner("Amy") => peo5 = ( List("Amy") , 1 ) Merger Partition: K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin)) K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))
上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:
scala> val people = List(("male", ("Mobin",89)),("male", ("Kpop",98)),("female", ("Lucy",99)),("male", ("Lufei",77)),("female", ("Amy",97)))
scala> val rdd = sc.parallelize(people) rdd: org.apache.spark.rdd.RDD[(String, (String, Int))] = ParallelCollectionRDD[2] at parallelize at <console>:23 scala> type MVType = (String, Int) defined type alias MVType scala> val combinByKeyRDD = rdd.combineByKey( | (x: MVType) => (List(x), 1), | (peo: (List[MVType], Int), x:MVType) => (x :: peo._1, peo._2 + 1), | (sex1: (List[MVType], Int), sex2: (List[MVType], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[(String, Int)], Int))] = ShuffledRDD[3] at combineByKey at <console>:27 scala> combinByKeyRDD.foreach(println) (male,(List((Mobin,89), (Kpop,98), (Lufei,77)),3)) (female,(List((Lucy,99), (Amy,97)),2))
相关文章推荐
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark 算子combineByKey 详解
- Spark 核心算子:combineByKey()
- spark中算子详解:combineByKey
- spark中算子详解:combineByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey
- Spark API 之 combineByKey(一)
- Spark算子--SortByKey
- Spark—聚合操作—combineByKey
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark之combineByKey学习理解
- Spark算子--groupByKey
- Spark算子[07]:reduce,reduceByKey,count,countByKey
- Spark算子详解之reduceByKey_sample_take_takeSample_distinct_sortByKey_saveAsTextFile_intersection
- 通过 “由对象V到对象C的转换” 来说明 Spark_Streaming api中 reduceByKey 与 combineByKey 注意事项
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce