大数据IMF传奇行动绝密课程第17课:RDD案例(join、cogroup、reduceByKey、groupByKey等)
2016-08-11 22:44
666 查看
RDD案例(join、cogroup、reduceByKey、groupByKey等)
join、cogroup、reduceByKey、groupByKey这些算子都是最常用的算子,都是lazy级别的。map作用:
map适用于任何类型的元素,且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
filter作用:
main方法里面一个调用的每一个功能都必须是模块化的,每个模块可以使用函数封装
join和cogroup是
join把两个集合的元素根据key聚合在一起
我理解cogroup应该是先进行一次groupByKey,然后再进行join
package com.tom.spark import org.apache.spark.{SparkConf, SparkContext} /** * 最常用、最重要的SparkTransformation案例实战 */ object Transformations { def main(args: Array[String]) { val sc = sparkContext("Transformation") //创建SparkContext mapTransformation(sc) //map案例 filterTransformation(sc) //filter案例 flatMapTransformation(sc) //flatMap案例 groupByKeyTransformation(sc) //groupByKey案例 reduceByKeyTransformation(sc) //reduceByKey案例 joinTransformation(sc) //join案例 coGroupTransformation(sc) //cogroup案例 sc.stop() //停止SparkContext,销毁相关的Driver对象,释放资源 } def sparkContext(name: String) = { val conf = new SparkConf().setAppName(name).setMaster("local") //创建SparkConf初始化程序的配置 val sc = new SparkContext(conf) //创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道 sc } def mapTransformation(sc: SparkContext){ val nums = sc.parallelize( 1 to 10) //根据集合创建RDD val mapped = nums.map(_ * 2) //map适用于任何类型的元素,且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理 mapped.collect().foreach(println) //收集计算结果并通过foreach循环打印 } def filterTransformation(sc: SparkContext){ val nums = sc.parallelize( 1 to 10) //根据集合创建RDD val filtered = nums.filter(_ % 2 == 0) //根据filter中作为参数的函数的Bool值来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD filtered.collect().foreach(println) //收集计算结果并通过foreach循环打印 } def flatMapTransformation(sc: SparkContext){ val bigData = Array("Scala Spark", "Java Hadoop", "Java Tachyon") //实例化字符串类型的Array val bigDataStrings = sc.parallelize(bigData) //创建以字符串为元素类型的parallelCollectionRDD val words = bigDataStrings.flatMap(_.split(" ")) //首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并为一个大的集合产生结果为{Scala, Spark, Java, Hadoop, Java, Tachyon} words.collect().foreach(println) //收集计算结果并通过foreach循环打印 } def groupByKeyTransformation(sc: SparkContext){ val data = Array(Tuple2(100, "Spark"), Tuple2(100, "Tackyon"), Tuple2(70, "Hadoop"), Tuple2(80, "Kafka"), Tuple2(80, "HBase")) val dataRDD = sc.parallelize(data) //创建RDD val grouped = dataRDD.groupByKey() //按照相同的Key对Value进行分组,分组后的value是一个集合 grouped.collect().foreach(println) //收集计算结果并通过foreach循环打印 } def reduceByKeyTransformation(sc: SparkContext){ val lines = sc.textFile("F:/helloSpark.txt",1)//读取本地文件并设置为1个Partition /** * 第四步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.1步:将每一行的字符串拆分成单个的单词 */ val words = lines.flatMap(_.split(" ")) //对每一行的字符串进行单词拆分,并把所有行的拆分结果通过flat合并成一个大的单词集合 /** * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1) */ val pairs = words.map( word => (word, 1)) /** * 第4.3步:在单词实例计数为1基础上,统计每个单词在文件中出现的总次数 */ val wordCounts = pairs.reduceByKey(_ + _) //对相同的key,进行Value的累加(包括Local和Reducer级别同时Reduce) wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2)) //打印reduceByKey之后的计算结果 } def joinTransformation(sc: SparkContext){ val studentName = Array( Tuple2(1, "Spark"), Tuple2(2, "Tachyon"), Tuple2(3, "Hadoop") ) val studentScores = Array( Tuple2(1, 100), Tuple2(2, 95), Tuple2(3, 65) ) val names = sc.parallelize(studentName) val scores = sc.parallelize(studentScores) val studentNameAndScore = names.join(scores) studentNameAndScore.collect().foreach(println) } def coGroupTransformation(sc: SparkContext){ val studentName = Array( Tuple2(1, "Spark"), Tuple2(2, "Tachyon"), Tuple2(3, "Hadoop"), Tuple2(1, "GOOD1"), Tuple2(2, "GOOD2"), Tuple2(3, "GOOD3") ) val studentScores = Array( Tuple2(1, 100), Tuple2(2, 95), Tuple2(3, 60), Tuple2(1, 110), Tuple2(2, 90), Tuple2(3, 65) ) val names = sc.parallelize(studentName) val scores = sc.parallelize(studentScores) val studentNameAndScore = names.cogroup(scores) studentNameAndScore.collect().foreach(println) } }
相关文章推荐
- 大数据IMF传奇行动绝密课程第93课:SparkStreaming updateStateByKey案例实战和内置源码解密
- day17:RDD案例(join、cogroup、reduceByKey、groupByKey, join cogroup
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第14课:Spark RDD解密
- 大数据IMF传奇行动绝密课程第60课:使用Java和Scala在IDE中实战RDD和DataFrame动态转换操作
- 大数据IMF传奇行动绝密课程第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作
- 大数据IMF传奇行动绝密课程第95课:通过SparkStreaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- 大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第100-101课:使用Spark Streaming+Spark SQL+Kafka+FileSystem综合案例
- 大数据IMF传奇行动绝密课程第82课:Spark Streaming案例动手实战并在电光石火间理解其工作原理
- 大数据IMF传奇行动绝密课程第16课:RDD实战(RDD基本操作实战及Transformation流程图)
- 大数据IMF传奇行动绝密课程第15课:RDD创建内幕彻底解密
- 大数据IMF传奇行动绝密课程第86课:SparkStreaming数据源Flume实际案例分享
- 大数据IMF传奇行动绝密课程第67课:spark SQL案例综合实战