您的位置:首页 > 其它

Spark笔记一之启动、简单RDD、提交、测试

2016-05-04 08:10 399 查看
./spark-shell --master local[2] #启动一个本地模式的spark应用 2代表的两个线程

启动Hadoop2.x start-dfs.sh start-yarn.sh stop-dfs.sh stop-yarn.sh

新建spark项目

scala scala 填名称 一直下一步

File-Project Structure-Artifacts modules 建相关包main test

library +Java 填入相关Jar

打包

1、File-Project Structure-Artifacts +JAR frome modules main class ok (减去-不必要的jar包) ok

2、Build Build Artifacts build 在输出目录里寻找

/opt/modules/spark-1.6.0-bin-hadoop2.6/bin

./spark-submit /opt/tools/sparkApp2.jar

/bin/spark-submit \

--class "SimpleApp" \

--master local[4] \

/opt/tools/sparkApp2.jar

在 Spark 中,一个应用程序中包含多个 Job 任务。

在 MapReduce 中,一个 Job 任务就是一个应用

弹性分布式数据集

RDD

resilient distributed dataset

1) 分区 partitioned, split

2) 计算 compute

3) 依赖

RDD操作详解

1、RDD 创建

a)外部数据源

HDFS var rdd=sc.textFile("hdfs://hadoop-00:9000/home910/liyuting/input/a.txt")

var rdd=sc.textFile("hdfs://hadoop-00:9000/home910/liyuting/input/a.txt",1) 以一个分区读取文件

b)集合Parallelized(并行化) Collections (测试用的多)

sc.parallelize(Array(1, 2, 3, 4, 5)) sc.parallelize(List(1, 2, 3, 4, 5))

RDD 分区partition

sc.textFile("",4):RDD有四个分区,手动的指定分区个数

2、RDD Tansformation

从一个RDD变为另外一个RDD,

WordCount:

map reduceByKey

text -> RDD[String] -> RDD[(String,Int)] -> RDD[(String,Int)] -> collect

wordRDD
kvRDD resultRDD

一行一行的数据 所有的RDD都是从第一个RDD来的记录的都是操作过程

lazy,懒执行 不立即运行,当action时触发时开始实际运行Tansformation

lineage,生命周期,转换

map(func) 一个元组转换成另一个元组 map(x => (x,1))

filter(func) 过滤 返回满足这个功能条件的值 rdd.filter(_.contains("hello"))

repartition(numPartitions) 从新分区 rdd.repartition(1)
http://spark.apache.org/docs/latest/programming-guide.html其他的Tansformation参考RDD Tansformation

3、RDD action

触发计算,进行实际的数据处理

rdd.reduce(_+_) 简单相加

wordcount.saveAsTextFile("hdfs://hadoop-00:9000/home910/liyuting/output2")

foreach(func) var num=0 rdd.foreach(x=>println(x+",sprak")) val nErrors=sc.accumulator(0.0) rdd.foreach(x=>nErrors+=1) nErrors.value

4、RDD cache

rdd.cache

cache方法,是延迟执行,需要在一个action执行以后,进行缓存RDD

cache是persistent特殊缓存方式,将RDD放到内存中

Spark App监控 正在运行的监控
http://<driver-node>:4040 webUI 一个spark应用对应一个端口

Spark JobHistory 查看记录以前应用的状态 结束之后的监控

配置conf/spark-env.sh

SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop-00:9000/home910/liyuting/sparkhistory" 目录必须先创建

配置conf/spark-defaults.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop-00:9000/home910/liyuting/sparkhistory

spark.eventLog.compress true #是否压缩

./sbin/start-history-server.sh
http://<driver-node>:18080
退出应用时应该先sc.stop()再exit

Spark on YARN

bin/spark-submit --help 获取命令信息

bin/spark-submit \ #client模式 driver在本机上 在本地启动driver

--class org.apache.spark.examples.SparkPi \

lib/spark-examples-1.3.0-hadoop2.6.0-cdh5.4.0.jar \

10

bin/spark-submit \

--deploy-mode cluster \ #集群Standalone模式 driver在集群中的节点上

--class org.apache.spark.examples.SparkPi \

lib/spark-examples-1.3.0-hadoop2.6.0-cdh5.4.0.jar \

10

===========================================================

bin/spark-submit \

--master yarn \

--class org.apache.spark.examples.SparkPi \

lib/spark-examples-1.6.0-hadoop2.6.0.jar \

10

spark-submit \

--master yarn \

--class org.apache.spark.examples.SparkPi \

lib/spark-examples-1.3.0-hadoop2.6.0.jar \

10

本地提交jar

./bin/spark-submit \

--class wordcount \

--master local \

/opt/tools/workspace.jar \

hdfs://192.168.192.137:9000/data/t1.csv

spark-submit \

--class topk \

--master local \

/opt/tools/workspace.jar \

hdfs://192.168.192.137:9000/data/a.txt \

hdfs://192.168.192.137:9000/out2

提交到yarn 错误待解决

./bin/spark-submit \

--class wordcount \

--master yarn \

--deploy-mode client \

--num-executors 1 \

--executor-memory 1g \

--executor-cores 1 \

--queue thequeue \

/opt/tools/workspace.jar \

hdfs://192.168.192.137:9000/data/t1.csv

./bin/spark-submit

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

--driver-memory 1g \

--executor-memory 1g \

--executor-cores 1 \

--queue thequeue \

/opt/modules/spark-1.6.0-bin-hadoop2.6/lib/spark-examples-1.6.0-hadoop2.6.0.jar \

10

配置

conf/spark-env.sh

HADOOP_CONF_DIR必须有

========================Hadoop YARN=======================

yarn-site.xml

<property>

<name>yarn.resourcemanager.hostname</name>

<value>hadoop-yarn.cloudyhadoop.com</value>

</property>

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

slaves

hadoop-yarn.cloudyhadoop.com

Spark内核分析

1、编程模型

SparkContext 创建SparkContext

val rdd = sc.textFile("/").flatMap(_.split(" ")).map(_ =>(_,1)).reduceByKey(_ + _)
创建RDD 构建DAG

rdd.collect 执行RDD action

2、DAG Scheduler

3、Task Scheduler

4、RDD、SparkContext源码解析

SparkContext 初始化的过程主要的核心:(通过linux安装IDEA运行spark任务debug查看)

1) 依据 SparkContext 的构造方法中的参数 SparkConf 创建一个SparkEnv

2) 初始化, Spark UI,以便 Spark Application 在运行是,方便用户监控,默认端口为 4040

3) 创建和启动 Scheduler

a) 创建 TaskScheduler、 SchedulerBackend

b) 创建 DAGScheduler

c) 启动 TaskScheduler、 DAGScheduler

4) 启动 Executors

wordcount测试

var rdd=sc.textFile("hdfs://192.168.192.128:9000/data/input/danc.data")

var rdd=sc.textFile("file:///opt/tools/a.txt")

var rdd=sc.textFile("hdfs://hadoop-00:9000/home910/liyuting/input/a.txt")

var wordcount = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => a+b)

wordcount.collect()

var wordsort=wordcount.sortByKey(false).collect() #按key值升序

wordsort.collect()

val wordcount=file.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

wordcount.collect()

val wordsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect()

wordsort.collect()

val file = sc.textFile("hdfs://hadoop.master:9000/data/intput/wordcount.data")

val count = file.flatMap(line=>(line.split(" "))).map(word=>(word,1)).reduceByKey(_+_)

count.collect()

count.textAsFile("hdfs://hadoop.master:9000/data/output")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: