带有状态的SparkStreaming单词计数程序
2017-02-07 18:15
351 查看
在另外一篇《SparkStreaming的入门级程序:WordCount》文章中,只是统计每一个批次的数据,是不带状态的单词计数程序,使用的是reduceByKey()方法,它只能统计当前批次的单词个数,而不会累加上一个批次的单词个数;而带有状态的单词计数程序会累加上个批次的单词个数,它使用的则是updateStateByKey()方法。
在pom.xml文件中引入一下依赖:
StateFulSteamingWordCount.scala代码如下:
由于SparkStreaming程序在运行的时候会在控制台打印很多info日志,影响我们对于实时统计结果的观察,所以我们在程序中通过如下代码来设置日志的输出级别:
启动SparkStreaming程序。
使用nc启动一个SocketServer,监听8888端口,输入几行单词试试:
观察控制台的SparkStreaming程序实时输出的单词计数结果:
在pom.xml文件中引入一下依赖:
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.6</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.2</version> </dependency> </dependencies>
StateFulSteamingWordCount.scala代码如下:
package streams.test import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf} import streams.LoggerLevels /** * Created by SYJ on 2017/2/7. */ object TestStateFulStreamingWordCount { /** * 当前定义的函数,接收一个迭代器类型的参数。 * 对Iterator[(String ,Seq[Int], Option[Int])]中泛型的解释: * SparkStreaming每隔一段时间就会产生一个批次的RDD, * 在调用该函数之前,会对当前批次的数据安装key进行分组, * 所以第一参数String指的就是分组的key,在本例中指的就是单词, * 而第二个参数Seq[Int]就是key所对应的value,在本例中指的就是 * 由很多的数字1组成的集合。 * 而第三个参数Option[Int]表示历史数据,Option本身就表示可能有 * 数据,也可能没有数据,第一个批次是没有历史数据的,所以数据是0, * 从第二个批次开始就有数据,也就是每两个批次进行叠加后的值, * 所以Option[Int]表示初始值或者叠加后的值。 */ val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => { /** * map方法接收一个元组t, * 其中t._1表示分组的key,这个key不变, * t._2.sum表示对Seq[Int]中的值求和, * t._3.getOrElse(0)表示获取每两个批 * 次叠加后的值,初始值为0; */ //it.map(t => (t._1, t._2.sum, t._3.getOrElse(0))) /** * 上面的方式不太好,因为t._1、t._2和t._3不太好识别, * 我们可以使用模式匹配,注意如果使用模式匹配的话, * map后面就不能使用小括号了,必须使用大括号。 */ it.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) } } def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("TestStateFulStreamingWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) /** * 必须设置checkpoint目录, * 是为了避免历史数据丢失而导致 * 新的数据和旧的数据无法做汇总或者聚合操作; * 如果程序在Spark集群上跑,通常保存到HDFS中, * 由于这里只是单机程序,所以就保存到本地磁盘上. */ ssc.checkpoint("c://ck3") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 8888) val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) /** * 如果要实现对每个批次的单词数量进行累加, * 可以使用updateStateByKey方法,它可以根据 * key来更新状态,可以累加以前的数据。 * 这个updateStateByKey方法要求传进去3个参数: * 第一个参数是自定义的累加函数,告诉它我们的累加逻辑; * 第二个参数是一个分区器,可以使用HashPartitioner, * 也可以定义自己的分区器;定义分区器的目的是为了 * 避免数据倾斜(数据都集中到某些机器上面去了); * 第三个参数是一个Boolean值,表示其他的计算也要使用该分区器; */ val result: DStream[(String, Int)] = wordAndOne.updateStateByKey( updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) result.print() ssc.start() ssc.awaitTermination() } }
由于SparkStreaming程序在运行的时候会在控制台打印很多info日志,影响我们对于实时统计结果的观察,所以我们在程序中通过如下代码来设置日志的输出级别:
package streams import org.apache.log4j.{Logger, Level} import org.apache.spark.Logging object LoggerLevels extends Logging { def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }
启动SparkStreaming程序。
使用nc启动一个SocketServer,监听8888端口,输入几行单词试试:
观察控制台的SparkStreaming程序实时输出的单词计数结果:
相关文章推荐
- spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
- spark-streaming 编程(二) word count单词计数统计
- DStream操作实战:1.SparkStreaming接受socket数据,实现单词计数WordCount
- 第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止
- Spark Streaming快速状态流处理
- 【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析
- spark streaming测试之三有状态的接收数据
- hadoop2.5的第一个HelloWorld程序—单词计数(WordCount.)
- Spark Streaming揭秘 Day14 State状态管理
- 简单SparkRDD单词计数操作
- Spark Streaming 1.6 流式状态管理分析
- spark streaming的有状态例子
- 也写单词计数程序
- Spark Streaming 1.6 流式状态管理分析
- Flume Push数据到spark streaming或者接收Spark streaming的poll数据时实际运行程序总结
- java 通过Spark实现单词计数的功能
- 单词计数程序
- 用insert重写单词计数程序
- 第一章 flex单词计数程序
- SparkStreaming+Zookeeper+Kafka入门程序