您的位置:首页 > 其它

使用IDE开发Spark应用程序

2016-08-12 23:23 302 查看
使用IDE开发Spark应用程序有两种方式,一种是在本地(Local)运行,另一种是在集群中运行。下面分别介绍两种方式:

一、使用Scala开发本地测试的Spark应用程序。

1、安装IDE集成开发环境(前提是要在Windows中安装了Java和Scala)。

2、使用集成开发环境创建工程WordCount,并修改Scala Library Container为2.10.x

(工程名WordCount右键,点击properties,选择Scala complier,点击library,选择2.10.x)

3、添加spark依赖。也就是将spark-1.6.2-bin-hadoop2.6解压,找到lib中的spark-assembly-1.6.2-hadoop2.6.0.jar.(WordCount右键,点击BulidPath,)

4、创建Scala包,并创建Scala object,将class WorlCount,改为object
WordCount。

5、编写main函数
package com.dt.spark

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

 * 使用Scala开发本地测试的spark WordCount程序

 */

object WordCount {

  def main(args : Array[String]){

    /**

     * 第一步,创建Spark的配置对象SparkConf,设置spark程序的运行时

     * 的配置信息。  例如说通过setMaster来设置程序要连接的spark集群的Master的URL

     * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常

     * 差(例如只有1G的内存)的初学者

     * 

     */

    val conf = new SparkConf()//创建spark项目对象(Ctrl+shift+o)

    conf.setAppName("My first spark name")//设置应用程序的名称,在程序运行的监控界面可以看到名称

    conf.setMaster("local")//此时(local)程序在本地运行,不需要安装spark集群

    

    /**

     * 第二步,创建sparkcontext对象,

     * SparkContext对象是Spark程序所有功能的唯一入口,无论是采用Java,Scala,Python,R等都需要一个Sparkcontext

     * Spark从text核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler

     * SchedulerBack,同时还会负责Spark程序往Master注册程序等

     * SparkContext是整个应用程序中最为至关重要的一个对象

     *  

     * 

     */

    

    val sc = new SparkContext(conf) //通过创建SparkContext对象通过传入SparkCOnf对象实例来定制Spark运行的具体信息

    

    /**

     * 第三步:根据具体的数据来源(HDFS、HBase、local FS、DB、S3)

     * 通过SparkCOntext来创建RDD

     * RDD的创建有3中方式:根据外部的数据来源(例如HDFS)、根据Scala集合(Range),由其他的RDD操作产生

     * 数据会被RDD划分成为一系列的Partitions(分区),分配到每个分区的数据属于一个Task的处理范畴

     */

    

    //val lines: RDD[String]= sc.textFile("E:\\大数据\\大数据软件\\spark-1.6.2-bin-hadoop2.6\\spark-1.6.2-bin-hadoop2.6\\README.MD", 1)//读取本地文件,并设置为一个partition

    val lines = sc.textFile("E:\\大数据\\大数据软件\\spark-1.6.2-bin-hadoop2.6\\spark-1.6.2-bin-hadoop2.6\\README.MD", 1)//读取本地文件,并设置为一个partition

    

    /**

     * 第四步:对初始的RDD进行transformation级别的处理,例如Map,FIlter等高阶函数的编程,来进行具体的数据计算

     * 1、强每一行的字符串拆分成单个的单词

     * 

     */

    

    //val words = lines.flatMap(_.split(" "))

    val words = lines.flatMap{line => line.split(" ")}//对每一行的字符串进行单词的切分,并把所有行的拆分结果合并成为一个大的单词集合(words)

    /**

     * 2、对每个单词进行实例计数为1,

     */

    val pairs = words.map{word => (word,1)}

    /**

     * 3、单词实例计数后,统计每个单词在文件中出现的总次数

     */

    val wordCounts = pairs.reduceByKey(_ + _)//对相同的key进行Value的累加(包括local和Reduce级别同时Reduce)

    

    //val sortWord = wordCounts.map{(name,value) => (value,name)}

    wordCounts.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))

    

    sc.stop()

  }

}6、编写完成之后再开发环境界面右键Run As 点击Scala Application,则会生成运行结果。

**************************************************************************************************

**************************************************************************************************

**************************************************************************************************

二、使用Scala开发集群运行的spark WordCount程序

1、代码有一下几句变动:

    //conf.setMaster("local")//此时(local)程序在本地运行,不需要安装spark集群

    //local可以用Spark://Matser:7077代替,此处也可以不写,运行的时候手动输入

    

    /**

     * 第三步:根据具体的数据来源(HDFS、HBase、local FS、DB、S3)

     * 通过SparkCOntext来创建RDD

     * RDD的创建有3中方式:根据外部的数据来源(例如HDFS)、根据Scala集合(Range),由其他的RDD操作产生

     * 数据会被RDD划分成为一系列的Partitions(分区),分配到每个分区的数据属于一个Task的处理范畴

     */

    

    //val lines: RDD[String]= sc.textFile("E:\\大数据\\大数据软件\\spark-1.6.2-bin-hadoop2.6\\spark-1.6.2-bin-hadoop2.6\\README.MD", 1)//读取本地文件,并设置为一个partition

    //val lines = sc.textFile("hdfs://Master:9000/user/hadoop/input")//读取HDFS文件并切分成不同的partition,不用设置partition

    val lines = sc.textFile("/user/hadoop/input")//读取HDFS文件并切分成不同的partition,不用设置partition

   

    val wordCounts = pairs.reduceByKey(_ + _)//对相同的key进行Value的累加(包括local和Reduce级别同时Reduce)

    

    wordCounts.collect.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))

2、生成jar包,项目名称右键--export--JAR file -- finish

将jar包复制到Linux文件夹下。

3、启动Hadoop集群,启动spark集群。

在spark的bin目录下运行,./spark-1.6.2-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.WordCount_Cluster --master spark://Master:7077 /文件夹/WordCount.jar

4、显示结果。

转载请注明出处,谢谢
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: