您的位置:首页 > 其它

HDP2.5.0 + Spark1.6.2 通过IDEA(Win64)远程提交spark jobs On YARN

2016-12-12 11:39 513 查看
本文利用Apache Ambari搭建了一个HDP2.5.0的集群,安装了HDP下最新的Spark1.6.2,通过spark-submit提交任务模式local、standalone、yarn-client都没有问题。

但编程环境往往在Win下,standalone模式需要独立启动Spark集群(占用资源是不可避免的),yarn-client模式提交问题繁多,且修改代码、打包、上传、执行spark-submit 的过程繁琐,还是希望能够通过IDE直接远程提交减少开发测试的一些重复工作。

编程环境:WIn7 x64、ideaIU-2016.3、Scala2.10.5

1. 创建Spark1.6.2的Maven项目:New Project,选择Maven工程,“Create from archetype”不选,Next,填写GroupId、项目路经等信息,完成创建。



2. 修改pom.xml文件:

①aws-java-sdk-core-1.10.6.jar
②aws-java-sdk-kms-1.10.6.jar

③aws-java-sdk-s3-1.10.6.jar
④spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar

这四个文件来自于Spark1.6.2安装目录下“/xxx/hdp/2.5.0.0-1245/hadoop/lib”,通过Maven引用自动下载几经尝试未成功,因此在项目src目录下创建了lib文件夹放入上述4个jar包,并Maven外部依赖。scala用的版本是2.10.5。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.xxx.yyy</groupId>
<artifactId>spark</artifactId>
<version>1.0.0</version>

<dependencies>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<scope>system</scope>
<version>1.10.60</version>
<systemPath>${pom.basedir}\src\lib\aws-java-sdk-core-1.10.6.jar</systemPath>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kms</artifactId>
<scope>system</scope>
<version>1.10.60</version>
<systemPath>${pom.basedir}\src\lib\aws-java-sdk-kms-1.10.6.jar</systemPath>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<scope>system</scope>
<version>1.10.60</version>
<systemPath>${pom.basedir}\src\lib\aws-java-sdk-s3-1.10.6.jar</systemPath>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<scope>system</scope>
<version>1.6.2.2.5.0.0-1245</version>
<systemPath>${pom.basedir}\src\lib\spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar</systemPath>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>


添加Yarn集群xml配置文件:值得注意的是core-site.xml文件下的net.topology.script.file.name 要注释掉。





3. 测试代码:

①创建scala目录:/src/main/scala,并Mark Directory asSources Root



②创建Scala Object,添加代码:



import org.apache.spark.{SparkConf, SparkContext}

object xxx {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("Spark WordCount")
conf.setMaster("yarn-client")
// 该参数是否添加只会影响是否显示Warn——hdp.version is not found
conf.set("spark.yarn.am.extraJavaOptions", "-Dhdp.version=2.5.0.0-1245")
// 该参数很重要,填写hdfs目录下的jar包,提交测试时就不用每次上传了,180M上传还是需要一定时间的
conf.set("spark.yarn.jar",
"hdfs://mycluster/hdp/apps/2.5.0.0-1245/spark/spark-hdp-assembly.jar")

val sc = new SparkContext(conf)
// 测试程序的代码需要打包,并SparkContext.addJar
// 此处引用的是项目打包后的本地jar位置,每次更新代码就只需打包,不用上传hdfs了
//sc.addJar("hdfs://mycluster/user/linxu/spark-1.0.0.jar")
sc.addJar("E:\\IdeaProjects\\spark\\target\\spark-1.0.0.jar")

val textFile = sc.textFile("hdfs://mycluster/user/linxu/README.txt")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)

counts.collect().foreach(println)

sc.stop()
}
}
完成后形成下图文件结构:



4. Maven打包并执行:



Run “xxx”并查看结果:

2016-12-12 16:54:32.074 [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] [main] [INFO] - Submitted application application_1481274074996_0077
2016-12-12 16:54:38.830 [YarnTest$] [main] [INFO] - SparkContext created.
2016-12-12 16:54:39.657 [org.apache.hadoop.mapred.FileInputFormat] [main] [INFO] - Total input paths to process : 1
2016-12-12 16:54:39.688 [YarnTest$] [main] [INFO] - Result:
(under,1)
(this,3)
(distribution,2)
(Technology,1)
(country,1)
(is,1)
(Jetty,1)
(currently,1)
(permitted.,1)
(check,1)
(have,1)
(Security,1)
(U.S.,1)
(with,1)
(BIS,1)
(This,1)
(mortbay.org.,1)
((ECCN),1)
(using,2)
2016-12-12 16:54:41.802 [akka.remote.RemoteActorRefProvider$RemotingTerminator] [sparkDriverActorSystem-akka.actor.default-dispatcher-4] [INFO] - Shutting down remote daemon.
2016-12-12 16:54:41.802 [akka.remote.RemoteActorRefProvider$RemotingTerminator] [sparkDriverActorSystem-akka.actor.default-dispatcher-4] [INFO] - Remote daemon shut down; proceeding with flushing remote transports.


这中间还是踩了很多坑的,和同事一起研究测试后还是幸运的成功了。如果有类似的编程需要,有遇到什么问题敬请联系。

需要转载请注明本文连接。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark scala yarn maven