您的位置:首页 > 其它

Spark Streaming学习与实践(1)

2016-01-31 21:48 441 查看
本文用来记录Spark Streaming学习与实践过程,目前公司集群环境为Spark 1.3.1

1. 输入

1.1. StreamingContext.textFileStream

val ssc = new StreamingContext(sc, Seconds(10))
val streamPath = "dir"
val lines: DStream[String] = ssc.textFileStream(streamPath)


如果streamPath设置为本地文件夹,在IDEA中并不会提示问题,而且debug中也不会提示错误,但是无法监控该文件夹。错误原因是:

def textFileStream(directory: String): DStream[String]

Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.

directory HDFS directory to monitor for new file

注意1:streamPath必须设置为HDFS的文件夹

1.1.1. 能否接受层级目录

1、先hadoop fs -mkdir hdfs://dir/dirOther,后-put file进dirOther中时,textFileStream无法检测出新的文件。

2、hadoop fs -put file://dirtOther hdfs://dir时,若 dirtOther下存在file时,textFileStream可以检测出新的文件。

3、hadoop fs -put file://dirtOther hdfs://dir时,如果dirtOther下还有其他目录时,会报错误,但Streaming不会停止。

1.1.2. 能否接受目录的通配符

dir中不接受通配符,会把通配符当成目录,从而报目录找不到的错误,但Streaming不会停止。

1.1.3. 能否同时起多个DStream,监视多个目录

val ssc = new StreamingContext(sc, Seconds(10))
val streamPath = "dir1"
val streamPath2 = "dir2"
val wordCounts = ssc.textFileStream(streamPath).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
val wordCounts2 = ssc.textFileStream(streamPath2).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
wordCounts2.print()
ssc.start()
ssc.awaitTermination()


可以同时起多个DStream,同时监控streamPath和streamPath2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark streaming