您的位置:首页 > 其它

Spark高级算子练习(二)

2018-01-26 17:03 239 查看
package cn.allengao.exercise

import org.apache.spark.{SparkConf, SparkContext}

object SparkRDDtest3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkRDDTest3").setMaster("local")
val sc = new SparkContext(conf)
/*
combineByKey : 和reduceByKey是相同的效果
###第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再做运算
###每个分区中每个key中value中的第一个值, (hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相当于hello的第一个1, good中的1
*/
val rdd1 = sc.parallelize(List(("hello", 1), ("hello", 1), ("good", 1)))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//运行结果:ArrayBuffer((hello,1), (hello,1), (good,1))
//    println(rdd1.collect.toBuffer)
//运行结果:ArrayBuffer((hello,2), (good,1))
//    println(rdd2.collect.toBuffer)

// ###当第一个参数变成 x+10 相当于hello的初始计算值是10+1,即11,good的初始计算值是10+1,也是11。
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//运行结果:ArrayBuffer((hello,12), (good,11))
//    println(rdd3.collect.toBuffer)

val rdd4 = sc.parallelize(List("dog", "cat", "gnu", "salmon", "rabbit", "turkey", "wolf", "bear", "bee"), 3)
val rdd5 = sc.parallelize(List(1, 1, 2, 2, 2, 1, 2, 2, 2), 3)
//拉链操作,形成map
val rdd6 = rdd5.zip(rdd4)
//将数量是 1 的放在一起,将数量是 2 的放在一起。
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
//运行结果:ArrayBuffer((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
//    println(rdd6.collect().toBuffer)
//运行结果:ArrayBuffer((1,List(dog, cat, turkey)), (2,List(gnu, salmon, rabbit, wolf, bear, bee)))
//    println(rdd7.collect().toBuffer)

//repartition
val rdd8 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
val func1 = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val res1 = rdd8.mapPartitionsWithIndex(func1)
/*
运行结果:ArrayBuffer([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4],
[partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
*/
//    println(res1.collect().toBuffer)
val rdd9 = rdd8.repartition(3)
//运行结果:3 ,分区数变为3。
//    println(rdd9.partitions.length)
val res2 = rdd9.mapPartitionsWithIndex(func1)
/*
运行结果:ArrayBuffer([partID:0, val: 3], [partID:0, val: 7], [partID:1, val: 1], [partID:1, val: 4],
[partID:1, val: 5], [partID:1, val: 8], [partID:2, val: 2], [partID:2, val: 6], [partID:2, val: 9])
*/
//    println(res2.collect().toBuffer)

//coalesce,默认数据不进行shuffle,则分区数量不变,true表示进行shuffle操作,分区数量根据参数改变
val rdd10 = rdd8.coalesce(3, true)
//运行结果:3
//    println(rdd10.partitions.length)
val res3 = rdd10.mapPartitionsWithIndex(func1)
/*
运行结果:ArrayBuffer([partID:0, val: 3], [partID:0, val: 7], [partID:1, val: 1], [partID:1, val: 4],
[partID:1, val: 5], [partID:1, val: 8], [partID:2, val: 2], [partID:2, val: 6], [partID:2, val: 9])
*/
//    println(res3.collect().toBuffer)

//collectAsMap : Map(b -> 2, a -> 1)
val rdd11 = sc.parallelize(List(("a", 1), ("b", 2)))
val res4 = rdd11.collectAsMap
//运行结果:ArrayBuffer((a,1), (b,2))
//    println(rdd11.collect().toBuffer)
//运行结果:Map(b -> 2, a -> 1)
//    println(res4)

//countByKey 计算key的数量
val rdd12 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
val res5 = rdd12.countByKey
//countByValue 计算(key,value)的数量
val res6 = rdd12.countByValue
//Map(a -> 1, b -> 2, c -> 2)
//    println(res5)
//Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)
//    println(res6)

//filterByRange 范围过滤
val rdd13 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1), ("b", 6)))
val res7 = rdd13.filterByRange("b", "d").collect()
//运行结果:ArrayBuffer((c,3), (d,4), (c,2), (b,6))
//    println(res7.toBuffer)

// flatMapValues  :
val rdd14 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
val res8 = rdd14.flatMapValues(_.split(" ")).collect()
//运行结果:ArrayBuffer((a,1), (a,2), (b,3), (b,4))
//    println(res8.toBuffer)

// foldByKey
val rdd15 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
val rdd16 = rdd15.map(x => (x.length, x))
// _+_ 表示字符串的拼接
val rdd17 = rdd16.foldByKey(" ")(_ + _)
//运行结果:ArrayBuffer((3,dog), (4,wolf), (3,cat), (4,bear))
//    println(rdd16.collect().toBuffer)
//运行结果:ArrayBuffer((4, wolf bear), (3, dog cat))
//    println(rdd17.collect().toBuffer)

// foreachPartition 不会生成一个新的RDD
val rdd18 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val res9 = rdd18.foreachPartition(x => println(x.reduce(_ + _)))
//运行结果: 6 15 24
//    print(res9)

//keyBy : 以传入的参数做key
val rdd19 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//以单词的长度作为key
val res10 = rdd19.keyBy(_.length).collect()
//以第一个字母作为key
val res11 = rdd19.keyBy(_ (0)).collect()
//运行结果:ArrayBuffer((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
//    println(res10.toBuffer)
//运行结果:ArrayBuffer((d,dog), (s,salmon), (s,salmon), (r,rat), (e,elephant))
//    println(res11.toBuffer)

//keys values
val rdd20 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val rdd21 = rdd20.map(x => (x.length, x))
val res12 = rdd21.keys.collect
val res13 =  rdd21.values.collect
//运行结果:ArrayBuffer(3, 5, 4, 3, 7, 5)
println(res12.toBuffer)
//运行结果:ArrayBuffer(dog, tiger, lion, cat, panther, eagle)
println(res13.toBuffer)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: