spark筑基篇-01-Eclipse开发Spark HelloWorld
2016-09-21 12:39
681 查看
前言
环境搭建
1 scala版
2 java版
代码
1 scala-低调版
2 scala-高调版
3 java-传统版
4 java-lambda版
运行效果
通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。
会通过原生的scala的方式,传统的java方式和java8的方式分别实现同一功能。
其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
以后的文章会记录如何建立集群。
另外,该系列文章会在本人闲暇时同时在 CSDN 和 简书 更新。
欢迎各位道友纠错。
Eclipse for scala:
此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774
注意:使用的
到处都是匿名内部类……
还好有java8的lambda来拯救你
环境搭建
1 scala版
2 java版
代码
1 scala-低调版
2 scala-高调版
3 java-传统版
4 java-lambda版
运行效果
1 前言
Spark这么火,越来越多的小伙伴开始搞大数据。通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。
会通过原生的scala的方式,传统的java方式和java8的方式分别实现同一功能。
其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
以后的文章会记录如何建立集群。
另外,该系列文章会在本人闲暇时同时在 CSDN 和 简书 更新。
欢迎各位道友纠错。
2 环境搭建
本人所使用环境如下:C:\Users\hylexus>java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode) C:\Users\hylexus>scala -version Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
Eclipse for scala:
Scala IDE build of Eclipse SDK Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe
此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774
注意:使用的
spark-core_2.11依赖的jar文件多的吓人,耐心等待下载jar吧…………^_^
2.1 scala版
pom.xml部分内容如下:<properties> <maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.target>1.6</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.5</scala.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs2</groupId> <artifactId>specs2-junit_2.11</artifactId> <version>2.4.16</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!-- <arg>-make:transitive</arg> --> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <compilerArguments> <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> --> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build>
2.2 java版
pom.xml文件内容如下:<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <confi 10b17 guration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <compilerArguments> <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> --> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build>
3 代码
3.1 scala-低调版
object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("WordCount") // .setMaster("local") val sc = new SparkContext(conf) val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties" //获取文件内容 val lines = sc.textFile(fileName, 1) //分割单词,此处仅以空格分割作为示例 val words = lines.flatMap(line => line.split(" ")) //String===>(word,count),count==1 val pairs = words.map(word => (word, 1)) //(word,1)==>(word,count) val result = pairs.reduceByKey((word, acc) => word + acc) //sort by count DESC val sorted=result.sortBy(e => { e._2 }, false, 1) val mapped=sorted.map(e => (e._2, e._1)) mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") }) sc.stop() } }
3.2 scala-高调版
object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setAppName("rank test").setMaster("local") val sc = new SparkContext(conf) val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties" sc.textFile(fileName, 1) //lines .flatMap(_.split(" ")) //all words .map(word => (word, 1)) //to pair .reduceByKey(_ + _) //count .map(e => (e._2, e._1)) // .sortByKey(false, 1) // .map(e => (e._2, e._1)) // .foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") }) sc.stop() } }
3.3 java-传统版
代码恶心的没法看啊……到处都是匿名内部类……
还好有java8的lambda来拯救你
import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WordCounter")// .setMaster("local"); String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"; JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile(fileName, 1); JavaRDD<String> words = lines .flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; // 以前的版本好像是Iterable而不是Iterator @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> pairs = words .mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> result = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer e, Integer acc) throws Exception { return e + acc; } }, 1); result.map( new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call( Tuple2<String, Integer> v1) throws Exception { return new Tuple2<>(v1._1, v1._2); } })// .sortBy(new Function<Tuple2<String, Integer>, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Tuple2<String, Integer> v1) throws Exception { return v1._2; } }, false, 1)// .foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> e) throws Exception { System.out.println("【" + e._1 + "】出现了" + e._2 + "次"); } }); sc.close(); } }
3.4 java-lambda版
用上java8的lambda之后,还是挺清爽的嘛^_^import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCountByJava8 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WordCounter")// .setMaster("local"); String fileName = "src/main/java/hylexus/spark/test1/WordCountByJava8.java"; JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile(fileName, 1); lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()) .mapToPair(word -> new Tuple2<>(word, 1)) .reduceByKey((e, acc) -> e + acc, 1) .map(e -> new Tuple2<>(e._1, e._2)) .sortBy(e -> e._2, false, 1) .foreach(e -> { System.out.println("【" + e._1 + "】出现了" + e._2 + "次"); }); sc.close(); } }
4 运行效果
//............... //............... 【->】出现了6次 【+】出现了5次 【import】出现了5次 【new】出现了4次 【=】出现了4次 //............... //...............
相关文章推荐
- spark筑基篇-01-Eclipse开发Spark HelloWorld
- Clojure语言Eclipse开发之Hello world
- Spring MVC 学习笔记1 - First Helloworld by Eclipse【& - java web 开发Tips集锦】
- Android:Windows64位搭建开发环境-JDK,SDK,Eclipse,ADT,AVD-HelloWorld Example,genymotion
- 穷人搭建j2ee开发平台_eclipse3.01和Jboss3.25之hello,world
- mahout+eclipse推荐系统开发学习之helloworld
- eclipse开发java say hello world
- windows GUI开发01 - 显示"Hello,world"
- 开发一个应用程序的思路:"Hello World"
- 『原创』用C++开发WM应用系列(1)——"Hello World" Pro!(上)
- 『原创』用C++开发WM应用系列(1)——"Hello World" Pro!(下)
- 第二部分:开发简要指南-第一章 Hello,World
- Android NDK开发“Hello World NDK”
- axis:开发helloworld webservice
- 使用WebSphere Integration Developer 7开发Business Integration Hello World应用程序
- 【按住你的心】——Android开发运行属于自己的Hello,World!
- xcode4.3.2开发iOS应用之HelloWorld
- Eclipse一些开发设置01 eclipse的自动完成机制
- Eclipse一些开发设置01 eclipse的自动完成机制
- SharePoint 2007工作流开发点滴(2):开发第一个SharePiont工作流: HelloWorldSequential 的注意事项