您的位置:首页 > 其它

Spark常用算子练习

2018-01-25 15:28 323 查看
package cn.allengao.exercise

import org.apache.spark.{SparkConf, SparkContext}

/**
* class_name:
* package:
* describe: SparkRDD算子练习
* creat_user: Allen Gao
* creat_date: 2018/1/25
* creat_time: 10:04
**/
object SparkRDDTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkRDDTest").setMaster("local")
val sc = new SparkContext(conf)

//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里面的每一个元素乘以2然后排序,true表示正序排序,false表示倒序排序
val res1 = rdd1.map(_ * 2).sortBy(x => x, true)
//过滤出大于等于10的元素
val res2 = res1.filter(_ >= 10)

val rdd2 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//将rdd2里面的每一个元素先切分再压平
val res3 = rdd2.flatMap(_.split(" "))

val rdd3 = sc.parallelize(List(List("a b c", "a b b"), List("e f g", "a f g"), List("h i j", "a a b")))
//将rdd3里面的每一个元素先切分再压平
val res4 = rdd3.flatMap(_.flatMap(_.split(" ")))

val rdd4 = sc.parallelize(List(5, 6, 4, 3))
val rdd5 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val res5 = rdd4.union(rdd5)
//求交集
val res6 = rdd4.intersection(rdd5)
//去重
val res7 = rdd4.union(rdd5).distinct()

val rdd6 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd7 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val res8 = rdd6.join(rdd7)

//求左连接和右连接
val res9 = rdd6.leftOuterJoin(rdd7)
val res10 = rdd6.rightOuterJoin(rdd7)
//求并集,这里注意到不用 “.union” 也可以,非常强大
val res11 = rdd6 union (rdd7)
//按key进行分组
val res12 = res11.groupByKey()
//分别用groupByKey和reduceByKey实现单词计数,注意groupByKey与reduceByKey的区别
//groupByKey
val res13 = res11.groupByKey().mapValues(_.sum)
/*
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),
此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。
同时如果数据量十分大,可能还会造成OutOfMemoryError。
*/

//reduceByValue,先进行局部聚合,再进行全局聚合
val res14 = res11.reduceByKey(_ + _)
/*reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,
有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,
数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
*/

val rdd8 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd9 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup 注意cogroup与groupByKey的区别
val res15 = rdd8.cogroup(rdd9)

val rdd10 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val res16 = rdd10.reduce(_ + _)

val rdd11 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd12 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd13 = rdd11.union(rdd12)
//按key进行聚合
val res17 = rdd13.reduceByKey(_ + _)
//按value的降序排序
val res18 = rdd13.reduceByKey(_ + _).map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
/*
笛卡尔积:
笛卡尔乘积是指在数学中,两个集合X和Y的笛卡尓积(Cartesian product),又称直积,
表示为X×Y,第一个对象是X的成员而第二个对象是Y的所有可能有序对的其中一个成员[3]  。
假设集合A={a, b},集合B={0, 1, 2},则两个集合的笛卡尔积为{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
*/
val res19 = rdd11.cartesian(rdd12)

//要通过action类型的算子才能显示出结果,将结果放到可变数组中,就可以看到输出结果,
// 如果不加toBuffer,则打印出来的是一个引用。
//执行结果:ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
//    println(res1.collect().toBuffer)
//执行结果:ArrayBuffer(10,12, 14, 16, 18, 20)
//    println(res2.collect().toBuffer)      //将元素以数组的方式打印出来
//执行结果:ArrayBuffer(a, b, c, d, e, f, h, i, j)
//    println(res3.collect().toBuffer)
//执行结果:ArrayBuffer(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)
//    println(res4.collect().toBuffer)
//执行结果:ArrayBuffer(5, 6, 4, 3, 1, 2, 3, 4)
//        println(res5.collect().toBuffer)
//执行结果:ArrayBuffer(4, 3)
//      println(res6.collect().toBuffer)
//执行结果:ArrayBuffer(4, 6, 2, 1, 3, 5)
//      println(res7.collect().toBuffer)
//执行结果:ArrayBuffer((tom,(1,1)), (jerry,(3,2)))
//       println(res8.collect().toBuffer)
//执行结果:ArrayBuffer((tom,(1,Some(1))), (jerry,(3,Some(2))), (kitty,(2,None)))
//          println(res9.collect().toBuffer)
//执行结果:ArrayBuffer((tom,(Some(1),1)), (jerry,(Some(3),2)), (shuke,(None,2)))
//          println(res10.collect().toBuffer)
//执行结果:ArrayBuffer((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
//          println(res11.collect().toBuffer)
//执行结果:ArrayBuffer((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
//          println(res12.collect().toBuffer)
//执行结果:ArrayBuffer((tom,2), (jerry,5), (shuke,2), (kitty,2))
//    println(res13.collect().toBuffer)
//执行结果:ArrayBuffer((tom,2), (jerry,5), (shuke,2), (kitty,2))
//    println(res14.collect().toBuffer)
//执行结果:ArrayBuffer((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
//    println(res15.collect().toBuffer)
//执行结果:15
//println(res16)
//执行结果:ArrayBuffer((tom,4), (jerry,5), (shuke,3), (kitty,7))
//    println(res17.collect().toBuffer)
//执行结果:ArrayBuffer((kitty,7), (jerry,5), (tom,4), (shuke,3))
//    println(res18.collect().toBuffer)
/*
执行结果:ArrayBuffer(((tom,1),(jerry,2)), ((tom,1),(tom,3)), ((tom,1),(shuke,2)),
((tom,1),(kitty,5)), ((jerry,3),(jerry,2)), ((jerry,3),(tom,3)), ((jerry,3),(shuke,2)),
((jerry,3),(kitty,5)), ((kitty,2),(jerry,2)), ((kitty,2),(tom,3)), ((kitty,2),(shuke,2)),
((kitty,2),(kitty,5)), ((shuke,1),(jerry,2)), ((shuke,1),(tom,3)), ((shuke,1),(shuke,2)),
((shuke,1),(kitty,5)))
*/
println(res19.collect().toBuffer)

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: