第17课:RDD案例(join、cogroup等实战)
2016-05-13 22:57
507 查看
本节课通过代码实战演示RDD中最重要的两个算子,join和cogroup
join算子代码实战:
//通过代码演示join算子
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)
val rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)
运行结果:
(1,(Spark,100))
(3,(Tachyon,90))
(2,(Hadoop,70))
cogroup算子代码实战:
首先通过java的方式编写:
SparkConf conf = new SparkConf().setMaster("local").setAppName("Cogroup");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> nameList = Arrays.asList(new Tuple2<Integer, String>(1, "Spark"),
new Tuple2<Integer, String>(2, "Tachyon"), new Tuple2<Integer, String>(3, "Hadoop"));
List<Tuple2<Integer, Integer>> ScoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 95), new Tuple2<Integer, Integer>(3, 80),
new Tuple2<Integer, Integer>(1, 80), new Tuple2<Integer, Integer>(2, 110),
new Tuple2<Integer, Integer>(2, 90));
JavaPairRDD<Integer, String> names = sc.parallelizePairs(nameList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(ScoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> nameAndScores = names.cogroup(scores);
nameAndScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
System.out.println("ID:" + t._1);
System.out.println("Name:" + t._2._1);
System.out.println("Score:" + t._2._2);
}
});
sc.close();
运行结果:
ID:1
Name:[Spark]
Score:[100, 80]
ID:3
Name:[Hadoop]
Score:[80]
ID:2
Name:[Tachyon]
Score:[95, 110, 90]
通过Scala的方式:
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90), Tuple2(1, 95), Tuple2(2, 65), Tuple2(1, 110))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()
运行结果:
(1,(CompactBuffer(Spark),CompactBuffer(100, 95, 110)))
(3,(CompactBuffer(Tachyon),CompactBuffer(90)))
(2,(CompactBuffer(Hadoop),CompactBuffer(70, 65)))
备注:资料来源于:DT_大数据梦工厂(Spark发行版本定制)更多私密内容,请关注微信公众号:DT_Spark如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
join算子代码实战:
//通过代码演示join算子
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)
val rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)
运行结果:
(1,(Spark,100))
(3,(Tachyon,90))
(2,(Hadoop,70))
cogroup算子代码实战:
首先通过java的方式编写:
SparkConf conf = new SparkConf().setMaster("local").setAppName("Cogroup");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> nameList = Arrays.asList(new Tuple2<Integer, String>(1, "Spark"),
new Tuple2<Integer, String>(2, "Tachyon"), new Tuple2<Integer, String>(3, "Hadoop"));
List<Tuple2<Integer, Integer>> ScoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 95), new Tuple2<Integer, Integer>(3, 80),
new Tuple2<Integer, Integer>(1, 80), new Tuple2<Integer, Integer>(2, 110),
new Tuple2<Integer, Integer>(2, 90));
JavaPairRDD<Integer, String> names = sc.parallelizePairs(nameList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(ScoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> nameAndScores = names.cogroup(scores);
nameAndScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
System.out.println("ID:" + t._1);
System.out.println("Name:" + t._2._1);
System.out.println("Score:" + t._2._2);
}
});
sc.close();
运行结果:
ID:1
Name:[Spark]
Score:[100, 80]
ID:3
Name:[Hadoop]
Score:[80]
ID:2
Name:[Tachyon]
Score:[95, 110, 90]
通过Scala的方式:
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90), Tuple2(1, 95), Tuple2(2, 65), Tuple2(1, 110))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()
运行结果:
(1,(CompactBuffer(Spark),CompactBuffer(100, 95, 110)))
(3,(CompactBuffer(Tachyon),CompactBuffer(90)))
(2,(CompactBuffer(Hadoop),CompactBuffer(70, 65)))
备注:资料来源于:DT_大数据梦工厂(Spark发行版本定制)更多私密内容,请关注微信公众号:DT_Spark如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark弹性数据集
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案