您的位置:首页 > 其它

Local模式下开发第一个Spark程序并运行于集群环境

2016-01-10 08:09 676 查看
第一阶段(1-3月):会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战。课程会涵盖Scala编程详解、Spark核心编程、Spark SQL和Spark Streaming、Spark GraphX、SparkR、Machine Learning、Spark内核以及源码剖析、性能调优、企业级案例实战等部分

第二阶段(Spark超大规模大数据案例实战):使用了Spark技术生态栈中的Spark Core、Spark SQL、Spark Streaming、SparkR、Machine Learning,进行离线计算和实时计算业务模块的开发、数据的关联性分析、用户行为模式和特征的训练与应用、用户网络的社区发现、用户影响力、能量传播、标签传播、标签推理、人群划分、年龄段预测、商品交易时序跳转

作业:实现广告点击排序(尾部)

本期内容:

1 使用IDE开发Spark分析

2 使用IDE开发Spark实战

3,使用IDE开发Spark的Local和Cluster

开发Spark(IDE、eclipse)

1、下载IDE scala-ide.org(本地win上开发)

安装Java和Scala-SDK

2、 设置eclipse

导入Spark依赖包jar文件(spark-assembly-1.6.0-hadoop2.6.0)这边Spark版本为1.6.0,Hadoop版本为2.6.0



/**
* 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时
* 的配置信息,例如通过SetMaster来设置程序的连接Spark集群的Master的
* URL,若设置为local,则代表Spark程序在本地运行,特别适合于
* 机器配置特别差的初学者。
*
*/
val conf = new SparkConf()
conf.setAppName("Wow,My First Spark App!")
//conf.setMaster("local")  //若将改行注释,可以运行于集群环境
//conf.setMaster("spark://Master:7077")  //绑定了程序必须运行于集群上
//此时,程序运行于本地模式,无需Spark集群

/**
*第二步:创建SparkCon对象
* SparkContext是Spark程序所有功能的唯一入口,无论采用Scala
* Java、Pytho等不同语言;
* SparkCon核心作用:初始化Spark应用程序运行所需要的核心组件,包括
* DAGSchedular、 taskSchedular、SchedularBacked,
* 同时还负责Spark应用程序中最为重要的一个对象。
*/
val sc = new SparkContext(conf)
//创建SparkConte对象,通过传入SparkContext实例来定制Spark运行的具体参数和配置信息。

/**
*第三步:根据具体的数据来源(HDFS、Hbase、Local FS、DB、S3等)通过SparkContext创建RDD
* RDD的创建基本有三种方式:
*   1、根据外部的数据来源(HDFS)
*   2、Scala集合
*   3、有其他的RDD操作
* 数据会被RDD划分为一系列的Partition、分配到每个Partition的数据属于一个Task的处理范围
*/

val lines = sc.textFile("C://Hello",1)   //并行度为1(partition),读取本地文件
//根据类型推断,这边的lines是RDD[String]

/**
* 第四部:对初始的RDD进行Transformation级别的处理(高阶函数:map、filter、flatmap等)
* 来进行具体的数据计算;
*  4.1、将每一行的字符串拆分成单个的单词
*/
val words = lines.flatmap(line => line.split(" "))
//对每一行的字符串进行单词拆分并把所以行的拆分结果通过flat合并成一个大的单词集合

/**
* 第四部:对初始的RDD进行Transformation级别的处理(高阶函数:map、filter、flatmap等)
* 来进行具体的数据计算;
*  4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word =>(word,1)
*/

val pairs = words.map{ word => (word,1)}

/**
* 第四部:对初始的RDD进行Transformation级别的处理(高阶函数:map、filter、flatmap等)
* 来进行具体的数据计算;
*  4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数
*/
val wordCounts.pairs.reduceBykey(_+_)
//对相同的Key,进行value的累积(包括local和Reduce级别同时进行Reduce)

wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" +wordNumberPair._2))

sc.stop()
/**
* 改程序没有Hadoop,运行会报错,属于正常,不影响运行
* Spark prebult with Hadoop
* 非程序错误
*/


第一步:修改依赖的Scala版本为Scala 2.10.x;
第二步:加入Spark 1.6.0的jar文件依赖
第三步:找到依赖的Spark Jar文件并导入到Eclipse中的Jar依赖;

第四步:在src下建立Spark工程包;

第五步:创建Scala入口类;

第六步:把class变成object并编写main入口方法。

导出文件为Jar






将该jar文件上传到集群上



运行该jar文件
./spark-aubmit

--class com.dt.spark.WordCount_cluster //运行包下的类

--master spark://Master:7077

//root/Documents/SparkApps/WordCount.jar //WordCount存放的地址






运行完成后,将结果打印出来



写Shell脚本,生成Wordcount.sh脚本,运行脚本自动执行,在History Web界面查看运行任务状态
chmod 755 Wordcount.sh



作业:实现广告点击排序

package spark.kong.dt.job
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
* @author css-kxr
* DT_大数据梦工厂
* HomeWord:广告点击排序
*/
object rankHit {
def main(args:String){
/**
* first:create new object of SparkConf,There is none Parameters
*/
val conf = new SparkConf()
conf.setAppName("That is so cool Rank the Hit rates advertisements")

/**
*Second:create new object of SparkContext that is a only program Entrance to run
*/
val sc = new SparkContext(conf)

/**
* third:According to the data resource HDFS,S3& DB to create RDD
* textFile("Path of files",1)  number 1 = one Partition
*/
val lines = sc.textFile("C://spark.kong.dt.job//spark-1.6.0-bin-hadoop-2.6.0//README.MD", 1)

/**
* fourth:Ececute the lines like map or filter and so on
*/

<p> val rank = lines.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey(_+_).map{word =>(word._2 ,word._1)}.sortByKey().collect().reverse.map{word =>(word._2 ,word._1)}foreach(println) </p><p>   Sc.stop()</p>
}
}


DT大数据梦工厂

新浪微博:www.weibo.com/ilovepains/

微信公众号:DT_Spark

博客:http://.blog.sina.com.cn/ilovepains

TEL:18610086859

Email:18610086859@vip.126.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: