Spark Streaming学习与实践(2)
2016-02-04 09:43
309 查看
2. 运算
2.1. StreamingContext.checkPoint
val ssc = new StreamingContext(sc, Seconds(10)) val checkpointDir = "hdfs://dir/checkpoint" ssc.checkpoint(checkpointDir)
1、为Spark Streaming设置checkpoint,在使用DStream.updateByKey前必须先设置checkpoint
2、在checkpointDir目录下会生成形如checkpoint-1454403510000和checkpoint-1454403510000.bk的文件和1f355a78-5404-4b47-9755-8529d15e9037的文件夹
3、checkpoint-1454403510000和checkpoint-1454403510000.bk内容一样,很明显1454403510000是时间戳,打开可以看见形如update hdfs://dir/1f355a78-5404-4b47-9755-8529d15e9037/rdd-87的语句,说明把当前状态存入了rdd-87中
2.2. DStream.updateByKey
val streamPath = "hdfs://dir/1" val words: DStream[(String, Int)] = ssc.textFileStream(streamPath).flatMap(_.split(" ")).map(x => (x, 1)) //为每个word转换为(word,1)的tuple val wordCounts = words.updateStateByKey[Int](updateFunc = (sumNow: Seq[Int], sumLast: Option[Int]) => { Some(sumNow.sum + sumLast.getOrElse(0)) }) wordCounts.print() ssc.start() ssc.awaitTermination()
updateByKey的官网解释
1、updateByKey是PairDStreamFunctions的算子,必须是DStream[(Key, Value)]才能使用。
2、updateByKey最长接受的参数如下:
def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ⇒ Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)])(implicit arg0: ClassTag[S]): DStream[(K, S)]
最短的如下:
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S])(implicit arg0: ClassTag[S]): DStream[(K, S)]
其中最重要的是updateFunc: (Seq[V], Option[S]) ⇒ Option[S]),它来说明如何更新状态(state)
val wordCounts = words.updateStateByKey[Int](updateFunc = (sumNow: Seq[Int], sumLast: Option[Int]) => { Some(sumNow.sum + sumLast.getOrElse(0)) })
1、假设words中是形如DStream((“hello”,1),(“hello”,1),(“hello”,1),(“hello”,1),(“hello”,1),(“spark”,1),(“spark”,1))
2、对于Key = “hello”来说updateFunc的输入参数sumNow就是Seq(1,1,1,1,1),sumLast是上个状态”hello”的Value值——空值;输出就是本状态”hello”的Value值——Some(1+1+1+1+1+0) = Some(5)。
3、最终wordCounts就是形如DStream((“hello”,5),(“spark”,2))
2.2.1. 期望只统计最近2个state的数据
val input = ssc.textFileStream(streamPath) val words = input.flatMap(_.split(" ")).map(x => (x, Array(1,0))) val wordCounts = words.updateStateByKey[Array[Int]](updateFunc = (countNow: Seq[Array[Int]], stateLast: Option[Array[Int]]) => { val sumNow: Int = countNow.map(_(0)).sum val sumLast: Int = stateLast.getOrElse(Array(0,0))(0) Some(Array(sumNow,sumLast)) }) wordCounts.map(x => (x._1, x._2.sum)).print()
2.2.2. 期望定义state’有新文件才生成新的state’,而不是目前每10秒一个新的state
1、尚未发现有API支持这一点2、尚未发现有方法判定当前DStream为空的API
3、尚未发现有API可将DStream => 除了DStream、Unit和StreamingContext之外的数据结构
4、尝试采用如下语句判定:
var flag = true val input = ssc.textFileStream(streamPath) input.foreachRDD(x => { if (x.isEmpty()) flag = false else flag = true }) val words = input.flatMap(_.split(" ")).map(x => (x, Array(1,0))) val wordCounts = words.updateStateByKey[Array[Int]](updateFunc = (countNow: Seq[Array[Int]], stateLast: Option[Array[Int]]) => { if (flag) { val sumNow: Int = countNow.map(_(0)).sum val sumLast: Int = stateLast.getOrElse(Array(0,0))(0) Some(Array(sumNow,sumLast)) } else { stateLast } }) wordCounts.map(x => (x._1, x._2.sum)).print()
结果Spark-shell中报错:
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
2016.2.17日更新:Spark-submit成功!
2.3. DStream.reduceByKeyAndWindow
业务场景:1、约每5分钟生成一个源文件放在hdfs://dir中,该文件代表1分钟内收到的所有单词
2、统计当前所有单词的出现次数
3、该单词如果40分钟内未出现,既40个文件中未出现则认为消失,不出现在统计结果中
4、要求每10秒扫描一次文件夹
由于2.2.2的问题没有解决,也就是说Spark Streaming每10秒一个state,而不是每个新文件一个state。由于累加结果有40分钟过期时间,因此无法用单纯的updateByKey算子来计算。尝试采用DStream.reduceByKeyAndWindow算子来替代。
采用2.2.2的方式实现
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 直播|易观CTO郭炜:精益化数据分析——如何让你的企业具有BAT一样的分析能力
- 挨踢部落第一期:Spark离线分析维度 推荐