您的位置:首页 > 其它

单独的应用程序(翻译自Learning.Spark.Lightning-Fast.Big.Data.Analysis)

2015-08-30 00:56 351 查看
在这次对Spark粗略的讲解过程中,我们还没有讲如何在单独的应用程序中使用Spark。撇开交互式运行来说,我们能在Java,Scala或这Python程序中连接Spark。与在shell中连接Spark相比,唯一的区别是,在程序中,你需要自己初始化SparkContext 。

连接Spark的过程因语言而异。在Java和Scala中,你在你的应用程序的Maven依赖中添加对spark-core 的依赖就可以了。到写这本书的时候,Spark的最新版是1.2.0,它对应的Maven坐标是:

groupId=org.apache.spark

artifactId=spark-core_2.10

version=1.2.0

Maven是流行的针对基于Java(原文:Java-based)语言的包管理工具,它让你能使用公共仓库中的包。你可以通过Maven构建你的项目,也可以通过其它能与Maven仓库对话的工作构建项目,包括Scala的sbt工具和Gradle。流行的集成开发环境(比如eclipse)都允许你直接往项目中添加Maven依赖。

在Python中,你只需要把应用程序写成Python脚本,但是你必须用Spark自带的bin/spark-submit脚本运行它们。bin/spark-submit脚本包含了Python中需要的Spark依赖。脚本为Spark的Python API设置环境变量到函数中。运行你的脚本,像例2-6一样。

例2-6.  运行Python脚本

bin/spark-submit my_script.py

(注意,在Windows中,你必须用反斜线代替正斜线)

初始化SparkContext

一旦你的应用程序连接到Spark,你需要在你的应用程序中导入Spark包并创建一个SparkContext对象。你需要先创建一个SparkConf对象去配置你的应用程序,然后用这个SparkConf对象创建SparkContext对象。例2-7到例2-9展示了各个Spark支持的语言创建SparkContext对象的过程。

例2-7.  在Python中初始化Spark

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(“local”).setAppName(“My App”)

sc = SparkContext(conf = conf)

例2-8. 在Scala中初始化Spark

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster(“local”).setAppName(“My App”)

val sc = new SparkContext(conf)

例2-9. 在Java中初始化Spark

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setMaster(“local”).

这些例子展示了初始化SparkContext对象的简单方法,我们只设置了两个参数:

1.集群URL(在这些例子中是“local”),它告诉Spark如何连接到集群,local是一个特殊的值,它使Spark运行在本地机器的一个线程上,而不会连接到集群。

2.应用程序名字(在这些例子中是“My App”),当你连接到集群上时,这个名字将在集群管理器的UI上标识你的应用程序。

除了这两个参数之外,还存在其它一些参数,可以用它们来配置你的应用程序的执行方式,或者添加传递到集群中去执行的代码,但是,这些在这本书之后的章节中才会讲到。

在你初始化你的SparkContext对象之后,你可以使用我们之前展示的所有方法去创建RDD(例如,从一个文件)或操作这些RDD。

最后,为了关闭Spark,你可以调用你的SparkContext对象上的stop()方法,或者简单地推出程序(用System.exit(0)或者sys.exit())。

这次对Spark简单的讲解应该足够让你在你的电脑上运行一个单独的Spark应用程序了。对于更多高级配置,第七章将会讲怎样连接你的程序到集群上,包括打包你的程序让它的代码被自动传送到工作节点上。目前,请参考官方Spark文档Quick Start Guide。



构建单独的应用程序

如果我们没有一个word count例子,这个大数据书籍的介绍章节将是不完整的。在单台机器上,实现统计单词是简单的,但是在分布式框架中,它是一个通常的例子,因为它涉及到从大量工作节点上读取数据和合并数据。我们将看看如果通过sbt和Maven构建和打包一个word  count程序。我们的所有示例都可以一起构建,但是为了说明一个拥有最少量以来的简单构建,在learning-spark-examples/mini-complete-example目录下,我们有一个更小的项目,正如你在例2-10(Java)和2-11(Scala)中看到的。

例2-10. Word count Java程序 —   现在不要担心细节

//  创建一个SparkConf

SparkConf conf = new SparkConf().setAppName(“wordCount”);

JavaSparkContext sc = new JavaSparkContext(conf);

//  加载我们的数据

JavaRDD<String> input = sc.textFile(inputFile);

//  分割行为单词集

JavaRDD<String> words = input.flatMap(

new FlatMapFunction<String, String>() {

public Iterable<String> call(String x) {

return Arrays.asList(x.split(” “));

}});

//   转化为单词:次数对,统计单词

JavaPairRDD<String, Integer> counts = words.mapToPair(

new PairFunction<String, String, Integer>(){

public Tuple2<String, Integer> call(String x){

return new Tuple2(x, 1);

}}).reduceByKey(new Function2<Integer, Integer, Integer>(){

public Integer call(Integer x, Integer y){ return x + y;}});

//  把单词计数保存会文本

counts.saveAsTextFile(outputFile);

例2-11. Word count Scala程序 —
现在不要担心细节

// 创建一个SparkConf

val conf = new SparkConf().setAppName(“wordCount”)

val sc = new SparkContext(conf)

// 加载我们的数据.

val input = sc.textFile(inputFile)

// 分割行为单词集

val words = input.flatMap(line => line.split(” “))

// 转化为单词:次数对,统计单词

val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}

// 把单词计数保存会文本

counts.saveAsTextFile(outputFile)

我们能用sbt(例2-12)或者Maven(例2-13)用非常简单的构建文件构建这些应用程序。我们已经标记Spark Core依赖已经是被提供了的,所以,之后,当我们使用一个assembly JAR的时候,我们不会包含spark-core jar,因为spark-core jar已经在工作节点的类路径下了。

例2-12. sbt构建文件

name := “learning-spark-mini-example”

version := “0.0.1”

scalaVersion := “2.10.4”

// additional libraries

libraryDependencies ++= Seq(

“org.apache.spark” %% “spark-core” % “1.2.0” % “provided”

)

例2-13. Maven构建文件



小提示

spark-core包被标记为provided,以防它跟着我们的应用程序被打包进assembly JAR中。第七章会讲更多相关细节。

一旦我们的构建文件定义好了,我们能很容易地打包我们的应用程序,然后用bin/spark-submit脚本运行它们。spark-submit需要设置一些Spark需要用到的环境变量。在mini-complete-example目录下,我们能构建Java(例2-14)和Scala(例2-15)程序。

例2-14. Scala  构建和运行

sbt clean package

$SPARK_HOME/bin/spark-submit \

—class com.oreilly.learningsparkexamples.mini.scala.WordCount \

./target/…(as above) \

./README.md ./wordcounts

例2-15. Maven 构建和运行

mvn clean && mvn compile && mvn package

$SPARK_HOME/bin/spark-submit \

—class com.oreilly.learningsparkexamples.mini.java.WordCount \

./target/learning-spark-mini-example-0.0.1.jar \

./README.md ./wordcounts

关于连接应用程序到集群的更详细的示例,请参考Spark官方文档Quick Start

Guide。第七章会将打包应用程序的更多细节。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark