spark streaming监控HDFS文件目录
2018-03-16 16:59
671 查看
集群环境:CDH5.8.0 / spark1.6.0 / scala2.10.4
基于Scala的基本使用方式如下:
package com.egridcloud.sparkstreaming
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{ Durations, StreamingContext}
/**
* Created by LHX on 2018/3/7 下午 8:06.
* 监控文件夹,实现单词统计,结果保存到HDFS
*/
object SparkStreamingFile {
def main(args: Array[String]): Unit = {
val classes: Array[Class[_]] = Array[Class[_]](classOf[LongWritable], classOf[Text])
val conf = new SparkConf().setAppName("sparkstreamingfile")//.setMaster("local[2]")
conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
conf.set("spark.serialize", classOf[KryoSerializer].getName())
conf.registerKryoClasses(classes)
// 设置批次间隔时间
val streamingContext = new StreamingContext(conf, Durations.seconds(30))
// val inputPath = "C:/tmp/sparkstreamingfile"
val inputPath = args(0)
// val outputPath = "C:/tmp/sparkstreamingfile_save/"
val outputPath=args(1)
val hadoopConf = new Configuration()
val fileStream: InputDStream[(LongWritable, Text)] = streamingContext.fileStream[LongWritable,Text,TextInputFormat](inputPath, (path: Path) => {println(path.getName);path.getName.endsWith(".csv")}, false, hadoopConf)
//遍历每一行,用“,”分割
val flatMap: DStream[String] = fileStream.flatMap(_._2.toString.split(","))
//将每个单词标记 为1
val mapToPair: DStream[(String, Int)] = flatMap.map((_,1))
//将相同单词标记 累加
val reducerByKey: DStream[(String, Int)] = mapToPair.reduceByKey(_ + _)
reducerByKey.foreachRDD((a,b)=> println(s"count time:${b},${a.collect().toList}"))
//结果输出到HDFS
// reducerByKey.saveAsTextFiles(outputPath, "suffix")
reducerByKey.saveAsTextFiles(outputPath)
//是否触发job取决于设置的Duration时间间隔
streamingContext.start()
//等待程序结束
streamingContext.awaitTermination()
}
}打包上传集群,指定输入输出路径运行,往输入目录添加文件即可。
完整项目参考:GitHub
基于Scala的基本使用方式如下:
package com.egridcloud.sparkstreaming
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{ Durations, StreamingContext}
/**
* Created by LHX on 2018/3/7 下午 8:06.
* 监控文件夹,实现单词统计,结果保存到HDFS
*/
object SparkStreamingFile {
def main(args: Array[String]): Unit = {
val classes: Array[Class[_]] = Array[Class[_]](classOf[LongWritable], classOf[Text])
val conf = new SparkConf().setAppName("sparkstreamingfile")//.setMaster("local[2]")
conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
conf.set("spark.serialize", classOf[KryoSerializer].getName())
conf.registerKryoClasses(classes)
// 设置批次间隔时间
val streamingContext = new StreamingContext(conf, Durations.seconds(30))
// val inputPath = "C:/tmp/sparkstreamingfile"
val inputPath = args(0)
// val outputPath = "C:/tmp/sparkstreamingfile_save/"
val outputPath=args(1)
val hadoopConf = new Configuration()
val fileStream: InputDStream[(LongWritable, Text)] = streamingContext.fileStream[LongWritable,Text,TextInputFormat](inputPath, (path: Path) => {println(path.getName);path.getName.endsWith(".csv")}, false, hadoopConf)
//遍历每一行,用“,”分割
val flatMap: DStream[String] = fileStream.flatMap(_._2.toString.split(","))
//将每个单词标记 为1
val mapToPair: DStream[(String, Int)] = flatMap.map((_,1))
//将相同单词标记 累加
val reducerByKey: DStream[(String, Int)] = mapToPair.reduceByKey(_ + _)
reducerByKey.foreachRDD((a,b)=> println(s"count time:${b},${a.collect().toList}"))
//结果输出到HDFS
// reducerByKey.saveAsTextFiles(outputPath, "suffix")
reducerByKey.saveAsTextFiles(outputPath)
//是否触发job取决于设置的Duration时间间隔
streamingContext.start()
//等待程序结束
streamingContext.awaitTermination()
}
}打包上传集群,指定输入输出路径运行,往输入目录添加文件即可。
完整项目参考:GitHub
相关文章推荐
- Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
- Spark Streaming 监控HDFS目录
- Spark来监控hdfs里的文件,并用wordcount计算
- 在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉的解决方案
- spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
- Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。
- flume监控目录文件到hdfs
- Spark Streaming监控HDFS输入流
- C#监控本地目录文件变化
- spark - 从HDFS加载文件并分析
- inotify监控Nginx目录且将Log文件放入Flume监控文件中
- spark work目录处理 And HDFS空间都去哪了?
- Windows提供的对文件和目录监控的系统服务API
- Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
- C#实现实时监控文件目录下的变化
- flume实例二、监听目录日志上传到HDFS文件系统
- spark中读取hdfs中文件出错
- 市面上所有号称"虚拟机","防火墙"的实时监控杀毒软件无一不是使用的IFSHOOK技术.但是同时也有一些朋友不断写MAIL给我打听如何实现读写的监控.下面给出用VTOOLSD写的代码.也就是所有实时杀毒软件的奥秘.同时,很多拦截文件操作的软件,例如对目录加
- shell判断hdfs文件目录是否存在
- windows 监控文件目录