您的位置:首页 > 其它

spark01-算子练习03

2017-12-11 20:52 169 查看
val conf=new SparkConf().setAppName("ForeachDemo").setMaster("local")
val sc=new SparkContext(conf)
val rdd1 = sc.parallelize(List("hello tom","hello jerry","hello tom","hello world"), 2)
//求word_count
//第一种方法使用reduceByKey实现

println(rdd1.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().toBuffer)
//ArrayBuffer((tom,2), (hello,4), (jerry,1), (world,1))

/**
* 第二种方法使用combineByKey实现
*   def combineByKey[C](
*        createCombiner: V => C,
*        mergeValue: (C, V) => C,
*        mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
*        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
*   }
*/
/**
* combineByKey(x=>x,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d))
* x表示每个分区的第一个初始值的操作 这里是1 x=>x表示不做任何处理
* (a:Int,b:Int) 这个是先进行局部操作,在每个分区内的value相加
* (c:Int,d:Int) 这个是总体的操作,在最后各个分区内相加
*
*/
val rdd2=rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x=>x,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d))
println(rdd2.collect().toBuffer)
//ArrayBuffer((tom,2), (hello,4), (jerry,1), (world,1))
val rdd3=rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x=>x+10,(a:Int,b:Int)=>(a+b),(c:Int,d:Int)=>(c+d))
println(rdd3.collect().toBuffer)
//ArrayBuffer((tom,22), (hello,24), (jerry,11), (world,11))

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
//ArrayBuffer((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
println(rdd6.collect().toBuffer)
/**
* 将数量一样的放到一个集合中
* List(_) 会把第一个元素变为一个集合
* (x: List[String], y: String) => x :+ y 往集合中添加元素
* (m: List[String], n: List[String]) 集合之间做汇总操作
*/
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
println(rdd7.collect().toBuffer)
//ArrayBuffer((1,List(dog, cat, turkey)), (2,List(gnu, salmon, rabbit, wolf, bear, bee)))

//重新分区这两个都是重新分区意思必须带true
rdd7.repartition(3)==rdd7.coalesce(3,true)

//collectAsMap 将结果已map的方式展示
val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
val rdd10=rdd.collectAsMap
println(rdd10)

val rdd11 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
//根据key计算key的数量
println(rdd11.countByKey)//Map(a -> 1, b -> 2, c -> 2)key指的是 a b c
println(rdd11.countByValue)//Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)
//value指的是("a",1)整体
// -------------------------------------------------------------------------------------------
// -------------------------------------------------------------------------------------------
// filterByRange过滤并给一个范围

val rdd12 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
val rdd13 = rdd12.filterByRange("b", "d")
println(rdd13.collect().toBuffer)//ArrayBuffer((c,3), (d,4), (c,2))不包含开头包含结尾
//  -------------------------------------------------------------------------------------------
//  -------------------------------------------------------------------------------------------
//  flatMapValues处理的是values 最后键和值分别对应
val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
println(a.flatMapValues(_.split(" ")).collect().toBuffer)//ArrayBuffer((a,1), (a,2), (b,3), (b,4))
//  -------------------------------------------------------------------------------------------
//  -------------------------------------------------------------------------------------------
//foldByKey类似reduceByKey给一个初始值
val rdd14 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
val rdd15 = rdd14.map(x => (x.length, x))
println(rdd15.foldByKey("")(_+_).collect().toBuffer)//ArrayBuffer((4,wolfbear), (3,dogcat))

//  -------------------------------------------------------------------------------------------
//  -------------------------------------------------------------------------------------------
// foreachPartition循环操作每个分区不会产生新的rdd
val rdd16 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd16.foreachPartition(x => println(x.reduce(_ + _)))

//  -------------------------------------------------------------------------------------------
//  -------------------------------------------------------------------------------------------
// keyBy构建一个元组,父rdd的每一个元素为值,参数的结果为key

val rdd17 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
println(rdd17.keyBy(_.length).collect().toBuffer)//ArrayBuffer((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
//  -------------------------------------------------------------------------------------------
//  -------------------------------------------------------------------------------------------
// keys values
val rdd18 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd19 = rdd18.map(x => (x.length, x))
println(rdd19.keys.collect.toList)//List(3, 5, 4, 3, 7, 5)
println(rdd19.values.collect.toList)//List(dog, tiger, lion, cat, panther, eagle)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: