您的位置:首页 > 编程语言

如何构建第一个Spark项目代码

2016-04-28 16:52 288 查看

如何构建第一个Spark项目代码

环境准备

本地环境

操作系统

Window7/Mac

IDE

IntelliJ IDEA Community Edition 14.1.6

下载地址

JDK 1.8.0_65

下载地址

Scala 2.11.7

下载地址

其它环境

Spark:1.4.1

下载地址

Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

New Project



使用Maven模型创建一个Scala项目



填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目



项目创建完成后,目录结构如下



4.为项目添加JDK以及Scala SDK

点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。



至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

MyRouteBuild
MyRouteMain


这两个文件为模块文件,删除
MyRouteBuild
,重命名
MyRouteMain
DirectKafkaWordCount
。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}


将代码最上面的
package org.apache.spark.examples.streaming
,替换为
DirectKafkaWordCount
里的
package
部分即可。并覆盖
DirectKafkaWordCount
文件。

至此Spark处理代码已经编写完成。

修改
pom.xml
,为项目打包做准备

pom.xml
中编写了整个项目的依赖关系,这个项目中我们需要导入一些
Spark Streaming
相关的包。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
</dependency>

<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>


除此之外,如果需要把相关依赖打包到最终
JAR
包中,需要在
pom.xml
bulid标签中写入以下配置:

<plugins>
<!-- Plugin to create a single jar that includes all dependencies -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>


pom.xml
文件修改完成后,即可开始maven打包,操作如图:



点击右侧弹出窗口的Execute Maven Goal,在
command line
中输入
clean package




Spark作业提交

在项目
projectname/target
目录下即可找到两个
jar
包,其中一个仅包含Scala代码,另一个包含所有依赖的包。

jar
包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用
spark-submit
把任务提交到Yarn集群,即可看到运行结果。

Q&A

带有依赖的jar包有80+M,似乎加上了许多没用的依赖包,这是正常情况还是由于
pom.xml
配置错误导致的?

Reference

Spark Streaming + Kafka Integration Guide

http://spark.apache.org/docs/1.4.1/streaming-kafka-integration.html

Running Spark on YARN

http://spark.apache.org/docs/1.4.1/running-on-yarn.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: