Spark:用Java和Scala实现WordCount
2017-12-06 15:57
696 查看
Java版本
package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 本地测试的wordcount程序 * @author Administrator * */ public class WordCountLocal_1 { public static void main(String[] args) { // 编写spark应用程序 // 本地执行,是可以执行在eclipse中的mian方法中运行的。 //第一步 :常见sparkConf对象,设置spark应用的配置信息 //使用setMaster()可以设置spark应用程序要连接的spark集群的Master节点的URI;但是如果设置为local则代表在本地运行。 SparkConf conf = new SparkConf() .setAppName("WordCountLocal") .setMaster("local"); //第二步:串讲JavaSparkContext对象 //在spark中,SparkContext是spark所有功能的一个入口,无论使用java、scala还是python //都必须要有一个SparkContext,它的主要作用,包括初始化spark应用程序所需的一些核心组件,包括调度器, // 还会去到Spark Master节点上进行注册,等等 //一句话 SparkContext是spark应用中最重要的一个对象 //但是在spark汇总,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用Scala, //使用的就是原生的SparkContext对象,但是使用Java,那么就是JavaSparkContext对象, //如果是开始Spark Sql程序,那么就是SQLContext、HiveContext //如果是Spark Streaming程序,那么就是它独有的SparkContext JavaSparkContext sc = new JavaSparkContext(conf); //第三步:要针对输入源,创建一个初始的RDD //输入源中的数据会打散,分配到RDD的每个PARTITION中,从而形成了一个初始的分布式的数据集 //我们这里了,因为是本地测试,所以就针对本地文件 //SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法 //在Java中,创建的普通RDD,都叫做JavaRDD //在这里,RDD中有元素这种概念,如果是hdfs或者本地文件了?创建的RDD,每一个元素就相对于文件里的一行 JavaRDD<String> lines = sc.textFile("E://BigData//sparkdata//spark.txt"); //第四步:对初始RDD进行transformation操作,也就是一些计算操作 //通常操作会通过创建function,并配合rdd的map、flatmap等算子来执行 //function,通常如果比较简单,则创建指定function的匿名内部类 //但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类 //先将每一行拆分成单个的单词 //FlatMapFunction,有两个泛型参数,分别代表了输入和输出 //输入 String 输出String JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); //接着,需要将每一个单词映射为(单词,1)的这种格式 //因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数累加、 //mapToPair 其实就是将每个元素映射为一个(v1,v2)这样的tuple2类型的元素 //如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值 //mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型 //第二个和第三个泛型参数,代表的输入的Typle2的第一个值和第二个值的类型 //JavaPariRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型 JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }) ; //接着需要以单词作为key,统计每个单词出现的次数 //这里要使用reduceByKey这个算子,对每个key对应的value都进行rudece操作 //比如JavaPariRDD中有几个元素,分别为(hello,1)(hello,1)(hello,1) //reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算 //比如这里的hello,那么久相当于是1+1 = 2 ,然后2+1 = 3 //最后返回的JavaPairRDD中的元素,也就是tuple,但是第一个值就是每个key,第二个值 JavaPairRDD<String ,Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); //到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数 //但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作 //一个spark应用中,光是有transformation操作是不行的,是不会执行的,必须有一种叫做action //接着,可以使用一种叫做action操作,比如说foreach来触发程序的执行 wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> wordcount) throws Exception { System.out.println("\""+wordcount._1+"\""+ " appeared "+wordcount._2+" times."); } }); sc.close(); }
}
Scala 版本
package cn.spark.study.core import org.apache.spark.SparkConf import org.apache.spark.SparkContext class WordCount { def main(args: Array[String]){ val conf= new SparkConf().setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); val words = lines.flatMap(line =>line.split(" ")) val pairs = words.map( word => (word ,1) ) val wordCounts = pairs.reduceByKey{ _ + _ } wordCounts.foreach(wordCount => println(wordCount._1+" "+wordCount._2))
}
}
相关文章推荐
- Spark:用Scala和Java实现WordCount
- Spark:用Scala和Java实现WordCount
- Spark:用Scala和Java实现WordCount
- Spark:用Scala和Java实现WordCount
- python、scala、java分别实现在spark上实现WordCount
- Spark:用Scala和Java实现WordCount
- SparkStreaming实现HDFS的wordCount(java版)
- java8实现spark streaming的wordcount
- Spark wordcount - Python, Scala, Java
- Spark Streaming开发入门——WordCount(Java&Scala)
- Java实现Spark词配对Wordcount计数
- 分别用Java、Scala、spark-shell开发wordcount程序及测试代码
- java和scala分别实现WordCount
- java8实现spark wordcount并且按照value排序输出
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- maven构建Scala程序,实现spark的wordcount
- Spark 程序 WordCount实现 Scala、Python
- Spark:用Scala和Java实现WordCount
- 010-spark standalone模式JAVA版本WordCount代码
- 初试spark java WordCount