Spark Streaming学习与实践(1)
2016-01-31 21:48
441 查看
本文用来记录Spark Streaming学习与实践过程,目前公司集群环境为Spark 1.3.1
如果streamPath设置为本地文件夹,在IDEA中并不会提示问题,而且debug中也不会提示错误,但是无法监控该文件夹。错误原因是:
def textFileStream(directory: String): DStream[String]
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
directory HDFS directory to monitor for new file
注意1:streamPath必须设置为HDFS的文件夹
2、hadoop fs -put file://dirtOther hdfs://dir时,若 dirtOther下存在file时,textFileStream可以检测出新的文件。
3、hadoop fs -put file://dirtOther hdfs://dir时,如果dirtOther下还有其他目录时,会报错误,但Streaming不会停止。
可以同时起多个DStream,同时监控streamPath和streamPath2
1. 输入
1.1. StreamingContext.textFileStream
val ssc = new StreamingContext(sc, Seconds(10)) val streamPath = "dir" val lines: DStream[String] = ssc.textFileStream(streamPath)
如果streamPath设置为本地文件夹,在IDEA中并不会提示问题,而且debug中也不会提示错误,但是无法监控该文件夹。错误原因是:
def textFileStream(directory: String): DStream[String]
Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
directory HDFS directory to monitor for new file
注意1:streamPath必须设置为HDFS的文件夹
1.1.1. 能否接受层级目录
1、先hadoop fs -mkdir hdfs://dir/dirOther,后-put file进dirOther中时,textFileStream无法检测出新的文件。2、hadoop fs -put file://dirtOther hdfs://dir时,若 dirtOther下存在file时,textFileStream可以检测出新的文件。
3、hadoop fs -put file://dirtOther hdfs://dir时,如果dirtOther下还有其他目录时,会报错误,但Streaming不会停止。
1.1.2. 能否接受目录的通配符
dir中不接受通配符,会把通配符当成目录,从而报目录找不到的错误,但Streaming不会停止。1.1.3. 能否同时起多个DStream,监视多个目录
val ssc = new StreamingContext(sc, Seconds(10)) val streamPath = "dir1" val streamPath2 = "dir2" val wordCounts = ssc.textFileStream(streamPath).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) val wordCounts2 = ssc.textFileStream(streamPath2).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() wordCounts2.print() ssc.start() ssc.awaitTermination()
可以同时起多个DStream,同时监控streamPath和streamPath2
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 直播|易观CTO郭炜:精益化数据分析——如何让你的企业具有BAT一样的分析能力
- 挨踢部落第一期:Spark离线分析维度 推荐