您的位置:首页 > 其它

带有状态的SparkStreaming单词计数程序

2017-02-07 18:15 351 查看
在另外一篇《SparkStreaming的入门级程序:WordCount》文章中,只是统计每一个批次的数据,是不带状态的单词计数程序,使用的是reduceByKey()方法,它只能统计当前批次的单词个数,而不会累加上一个批次的单词个数;而带有状态的单词计数程序会累加上个批次的单词个数,它使用的则是updateStateByKey()方法。

在pom.xml文件中引入一下依赖:

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>


StateFulSteamingWordCount.scala代码如下:

package streams.test

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import streams.LoggerLevels

/**
* Created by SYJ on 2017/2/7.
*/
object TestStateFulStreamingWordCount {

/**
* 当前定义的函数,接收一个迭代器类型的参数。
* 对Iterator[(String ,Seq[Int], Option[Int])]中泛型的解释:
* SparkStreaming每隔一段时间就会产生一个批次的RDD,
* 在调用该函数之前,会对当前批次的数据安装key进行分组,
* 所以第一参数String指的就是分组的key,在本例中指的就是单词,
* 而第二个参数Seq[Int]就是key所对应的value,在本例中指的就是
* 由很多的数字1组成的集合。
* 而第三个参数Option[Int]表示历史数据,Option本身就表示可能有
* 数据,也可能没有数据,第一个批次是没有历史数据的,所以数据是0,
* 从第二个批次开始就有数据,也就是每两个批次进行叠加后的值,
* 所以Option[Int]表示初始值或者叠加后的值。
*/
val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
/**
* map方法接收一个元组t,
* 其中t._1表示分组的key,这个key不变,
* t._2.sum表示对Seq[Int]中的值求和,
* t._3.getOrElse(0)表示获取每两个批
* 次叠加后的值,初始值为0;
*/
//it.map(t => (t._1, t._2.sum, t._3.getOrElse(0)))

/**
* 上面的方式不太好,因为t._1、t._2和t._3不太好识别,
* 我们可以使用模式匹配,注意如果使用模式匹配的话,
* map后面就不能使用小括号了,必须使用大括号。
*/
it.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) }
}

def main(args: Array[String]) {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("TestStateFulStreamingWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

/**
* 必须设置checkpoint目录,
* 是为了避免历史数据丢失而导致
* 新的数据和旧的数据无法做汇总或者聚合操作;
* 如果程序在Spark集群上跑,通常保存到HDFS中,
* 由于这里只是单机程序,所以就保存到本地磁盘上.
*/
ssc.checkpoint("c://ck3")

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 8888)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

/**
* 如果要实现对每个批次的单词数量进行累加,
* 可以使用updateStateByKey方法,它可以根据
* key来更新状态,可以累加以前的数据。
* 这个updateStateByKey方法要求传进去3个参数:
* 第一个参数是自定义的累加函数,告诉它我们的累加逻辑;
* 第二个参数是一个分区器,可以使用HashPartitioner,
* 也可以定义自己的分区器;定义分区器的目的是为了
* 避免数据倾斜(数据都集中到某些机器上面去了);
* 第三个参数是一个Boolean值,表示其他的计算也要使用该分区器;
*/
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(
updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

result.print()
ssc.start()
ssc.awaitTermination()
}
}


由于SparkStreaming程序在运行的时候会在控制台打印很多info日志,影响我们对于实时统计结果的观察,所以我们在程序中通过如下代码来设置日志的输出级别:

package streams

import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging

object LoggerLevels extends Logging {

def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}


启动SparkStreaming程序。

使用nc启动一个SocketServer,监听8888端口,输入几行单词试试:

观察控制台的SparkStreaming程序实时输出的单词计数结果:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: