Spark Streaming基于状态、窗口的实时数据流
2017-10-03 21:04
423 查看
与前两篇Spark Streaming的实时处理案例,原理基本一致,这里不再演示。最大的不同是,这两种方式必须设置checkpoint。
(注:模拟器前面已给出)
步骤1. 定义状态state
步骤2. 定义状态更新函数func
步骤3. 对DStream进行checkpoint
步骤4. 以func为参数,在DStream上调用updateStateByKey
分别执行模拟器和解析器。
补充一点:如果是本地执行解析器,需在IDE中手动给定args(0)和args(1)参数,IntelliJ参数添加路径:Run -> Edit Configurations -> + ->Application
步骤1. 定义聚合函数func
步骤2. 对DStream进行checkpoint
步骤3. 确定窗口长度、滑动时间间隔
步骤4. 在DStream上调用window相关操作
这里的解析器在spark下提交时,执行以下命令:
30和10的含义为:每隔10s统计过去30s内的数据流,时间单位与执行模拟器时是不一样的。
Tom 3200
Jerry 21.5
Tom 123.7
Lucy 259
Ben 125
John 546
John 125.8
(注:模拟器前面已给出)
基于状态的实时数据分析
使用updateStateByKey(func)步骤:步骤1. 定义状态state
步骤2. 定义状态更新函数func
步骤3. 对DStream进行checkpoint
步骤4. 以func为参数,在DStream上调用updateStateByKey
package spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} import org.apache.log4j.{Level, Logger} object StatefulWordCount { def main(args:Array[String]): Unit ={ Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //定义状态函数 val updateFunc=(values: Seq[Int],state:Option[Int])=>{ val currentCount=values.foldLeft(0)(_+_) val previousCount=state.getOrElse(0) Some(currentCount+previousCount) } //创建ssc val conf=new SparkConf(). setAppName("StatefulWordCount"). //setMaster("spark://192.168.71.129:7077") setMaster("local[2]") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) //基于状态的操作需要进行checkpoint,输出路径如果是在HDFS上的必须原先不存在 //ssc.checkpoint("hdfs://node01:9000/streamingdata/StatefulWordCountlog") ssc.checkpoint(".") //当前路径 //处理数据流 val lines=ssc.socketTextStream(args(0),args(1).toInt) /* hello, hello, hello, spark, spark, !!!!!!!!!!!!! hello, hbase, hello, spark, hbase, wordcounts={ RDD1={(hello,1),(hello,1),(hello,1),(spark,1),(spark,1)}, RDD2={(hello,1),(hbase,1),(hello,1),(spark,1),(hbase,1)}....} RDD1={(hello,3),(spark,2)} RDD2={(hello,5),(spark,3),(hbase,2)}*/ val words=lines.flatMap(_.split(",")) val wordcounts=words.map(x=>(x,1)) val stateDstream=wordcounts.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } }
分别执行模拟器和解析器。
补充一点:如果是本地执行解析器,需在IDE中手动给定args(0)和args(1)参数,IntelliJ参数添加路径:Run -> Edit Configurations -> + ->Application
基于窗口的实时数据分析
构建模拟器,模拟网络环境下的数据流;编辑Spark Streaming应用程序,在node01提交以集群模式运行,获取node02上端口9999中的文本数据流,每隔10s统计过去30s内数据流中各单词累计出现次数。步骤1. 定义聚合函数func
步骤2. 对DStream进行checkpoint
步骤3. 确定窗口长度、滑动时间间隔
步骤4. 在DStream上调用window相关操作
package spark import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object WindowWordCount { def main(args: Array[String]) = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf = new SparkConf(). setAppName("WindowWordCount"). setMaster("spark://192.168.71.129:7077") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) //设置checkpoint路径 ssc.checkpoint("hdfs://node01:9000/WindowWordCountlog") val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(",")) val wordcounts = words. map(x => (x, 1)). reduceByKeyAndWindow((a: Int, b: Int) => a+b, Seconds(args(2).toInt), Seconds(args(3).toInt)) wordcounts.print() ssc.start() ssc.awaitTermination() } }
这里的解析器在spark下提交时,执行以下命令:
bin/spark-submit ~/WindowWordCount.jar node02 9999 30 10
30和10的含义为:每隔10s统计过去30s内的数据流,时间单位与执行模拟器时是不一样的。
SQL语句在Spark Streaming上的简单应用
数据源:Tom 3200
Jerry 21.5
Tom 123.7
Lucy 259
Ben 125
John 546
John 125.8
package spark import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext } import org.apache. spark.sql.SQLContext import org.apache. spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds ,StreamingContext } /*使用lazy加载的单件模式(singleton pattem)的方式来构建SQLContext实例,可以避免在foreachRDD中重复构建*/ object SQLContextSingleton { @transient private var instance: SQLContext = null //lazy方式实例化 def getInstance (sparkContext:SparkContext):SQLContext= synchronized{ if (instance == null){ instance = new SQLContext(sparkContext) } instance //用以返回SQLContext的对象 } } //样本类,用于构建RDD对应的DataFrame实例 //可以根据实际的数据格式,给出对应的解析样本类 case class Item(name:String,amount:Double) object Streaming_SQL{ def main( args : Array [ String] ){ //减少控制台输出信息 import org. apache. log4j. { Level, Logger } Logger. getLogger("org.apache.spark" ).setLevel(Level.WARN) Logger. getLogger("org.apache.spark.sql").setLevel(Level.WARN) Logger. getLogger("org.apache.spark.streaming").setLevel(Level.WARN) //创建StreamingContext实例 val conf = new SparkConf( ).setAppName("Streaming_SQL").setMaster("spark://192.168.71.129:7077")//.setMaster("local[2]") val ssc=new StreamingContext(conf,Seconds(10)) val words=ssc.socketTextStream("192.168.71.129",9999,StorageLevel.MEMORY_AND_DISK) words.foreachRDD { rdd => //将每个rdd转换为DataFrame即wordsDF val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDF = rdd.map(x => x.split(" ")).map(x => Item(x(0),x(1).toDouble)).toDF() //将每个wordsDF注册成临时表tb_wordsDF wordsDF.registerTempTable("tb_wordsDF") //对tb_wordsDF执行SQL查询 sqlContext.sql("select name, sum(amount) from tb_wordsDF group by name").show() } ssc.start() ssc.awaitTermination() } }
相关文章推荐
- spark streaming测试之四设置窗口大小接收数据
- 潘国庆:基于 Spark Streaming 构建实时计算平台实战解析
- 基于spark的实时流数据需跟历史数据进行对比时所遇到的问题
- 第107课: Spark Streaming电商广告点击综合案例底层数据层的建模和编码实现(基于MySQL)
- SparkStreaming+Kafka 处理实时WIFI数据
- 自己标注(不注意坑不少)-Spark+Kafka构建实时分析Dashboard案例——步骤三:Spark Streaming实时处理数据
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- Spark-Streaming与Spark-Sql整合实现实时股票排行---通过kafka列队数据
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- 使用Flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【公安大数据】
- Sparkstreaming基于kafka以Receiver方式获取数据原理和案例实战
- 实时流Streaming大数据:Storm,Spark和Samza
- 大数据IMF传奇行动绝密课程第92课:SparkStreaming中Transformations和状态管理解密
- 基于spark-streaming实时推荐系统(一)
- 实时流Streaming大数据:Storm,Spark和Samza
- 基于Spark的公安大数据实时运维技术实践
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密