通过Spark Streaming的window操作实战模拟热点搜索词案例实战
2016-08-24 08:05
591 查看
本博文主要内容包括:
1、在线热点搜索词实现解析
2、SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战
我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设置为5秒,那如果需要对1分钟内的数据做统计,该如何实现呢?SparkStreaming中提供了window的概念。我们看下图:
官网给的例子每个2秒钟更新过去3秒钟的内容,3秒钟算一下,5秒钟算一下,3秒钟是一个窗口。window可以包含多个batchInterval(例如5秒),但是必须为batchInterval的整数倍例如1分钟。另外window可以移动,称之为滑动时间间隔,它也是batchInterval的整数倍,例如10秒。一般情况滑动时间间隔小于window的时间长度,否则会丢失数据。
SparkStreaming提供了如下与window相关的方法:
从代码上面来看, 入口为:
一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
先计算oldRDD 和newRDD
//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值
reduceByWindow(reduceFunc, windowDuration, slideDuration) 代码:
可以看到他做了两次reduce, 第一次对整个self做一次reduce, 然后截取时间区间, 对结果再做一次reduce。
第一点: 对整个self做reduce会比较慢, 因为self都是相对比较大的集合。
第二点:进行了两次reduce ,源码如下:
如果我们看:
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
实际上他是调用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 详细计算过程参考之前的博文
这样的话其实他只对newRDDs和oldRDDs做reduce, 由于这两个RDDs都非常小, 可以想象效率是非常高的
如果看reduceByKeyAndWindow的话, 情况也是一样, 一个是执行:
而另外一个确是在已有的window值基础上做了简单的加加减减
宗上, 从效率上面考虑, 我们应该尽量使用包含invReduceFunc的方法, 同样情况下摒弃只有reduceFunc的方法
2、我们案例代码如下:
3、将程序打包运行到集群上观察结果:
4、接下来我们使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 这个函数,来实现增量的计算。
使用这个函数,必须进行Checkpoint。代码如下
1、在线热点搜索词实现解析
2、SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战
一:在线热点搜索词实现解析
背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中,大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。 Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设置为5秒,那如果需要对1分钟内的数据做统计,该如何实现呢?SparkStreaming中提供了window的概念。我们看下图:
官网给的例子每个2秒钟更新过去3秒钟的内容,3秒钟算一下,5秒钟算一下,3秒钟是一个窗口。window可以包含多个batchInterval(例如5秒),但是必须为batchInterval的整数倍例如1分钟。另外window可以移动,称之为滑动时间间隔,它也是batchInterval的整数倍,例如10秒。一般情况滑动时间间隔小于window的时间长度,否则会丢失数据。
SparkStreaming提供了如下与window相关的方法:
二、SparkStreaming 实现在线热点搜索词实战
1、经过分析我们采用reduceByKeyAndWindow的方法,reduceByKeyAndWindow方法分析如下:从代码上面来看, 入口为:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]] .mapValues(mergeValues)
先计算oldRDD 和newRDD
//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值
然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值
// 0秒 10秒 15秒 25秒 // _____________________________ // | previous window _________|___________________ // |___________________| current window | --------------> Time // |_____________________________| // // |________ _________| |________ _________| // | | // V V // old RDDs new RDDs //
reduceByWindow(reduceFunc, windowDuration, slideDuration) 代码:
可以看到他做了两次reduce, 第一次对整个self做一次reduce, 然后截取时间区间, 对结果再做一次reduce。
第一点: 对整个self做reduce会比较慢, 因为self都是相对比较大的集合。
第二点:进行了两次reduce ,源码如下:
def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }
如果我们看:
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
实际上他是调用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 详细计算过程参考之前的博文
这样的话其实他只对newRDDs和oldRDDs做reduce, 由于这两个RDDs都非常小, 可以想象效率是非常高的
def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.map(x => (1, x)) .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) }
如果看reduceByKeyAndWindow的话, 情况也是一样, 一个是执行:
self.reduceByKey(reduceFunc, partitioner) .window(windowDuration, slideDuration) .reduceByKey(reduceFunc, partitioner)
而另外一个确是在已有的window值基础上做了简单的加加减减
宗上, 从效率上面考虑, 我们应该尽量使用包含invReduceFunc的方法, 同样情况下摒弃只有reduceFunc的方法
2、我们案例代码如下:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by zpf on 2016/8/23. */ /** * 使用Scala并发集群运行的Spark来实现在线热搜词 * * 背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中 * 大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且 * 每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。 * Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理 * 实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现 * */ object OnlineHottestItems { def main(args: Array[String]){ /** * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 * 只有1G的内存)的初学者 * */ val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("OnlineHottestItems") //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setMaster("spark://Master:7077") //此时,程序在Spark集群 /* * 此处设置 Batch Interval 实在spark Streaming 中生成基本Job的单位,窗口和滑动时间间隔 * 一定是该batch Interval的整数倍*/ val ssc = new StreamingContext(conf, Seconds(5)) val hottestStream = ssc.socketTextStream("Master", 9999) /* * 用户搜索的格式简化为 name item,在这里我们由于要计算热点内容,所以只需要提取item即可 * 提取出的item通过map转化为(item,1)形式 * 每隔20秒更新过去60秒的内容窗口60秒,滑动20秒 * */ val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item , 1)) val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20)) hottestDStream.transform(hottestItemRDD => { val top3 = hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false). map(pair => (pair._2,pair._1)).take(3) for(item <- top3){ println(item) } hottestItemRDD }).print() ssc.start() ssc.awaitTermination() } }
3、将程序打包运行到集群上观察结果:
4、接下来我们使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 这个函数,来实现增量的计算。
使用这个函数,必须进行Checkpoint。代码如下
ssc.checkpoint("/user/checkpoints/") val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
相关文章推荐
- 大数据IMF传奇行动绝密课程第95课:通过SparkStreaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- 第95课:通过Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- Spark-Streaming的window滑动窗口及热点搜索词统计案例
- 第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密
- 第113课:Spark Streaming电商广告点击综合案例实战模拟点击数据的生成和数据表SQL建立
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- 第85讲:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark定制班第1课:通过案例对Spark Streaming透彻理解三板斧之一:解密Spark Streaming另类实验及Spark Streaming本质解析
- 第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理
- 第2课 通过案例对SparkStreaming透彻理解三板斧之二
- 第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密
- 通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming运行机制
- SPARK 第4期:通过案例实战掌握spark sql(dataframe)
- 第76课:Spark SQL基于网站Log的综合案例实战之Hive数据导入、Spark SQL对数据操作每天晚上20:00YY频道现场授课频道68917580
- (版本定制)第2课:通过案例对SparkStreaming透彻理解之二
- IMF传奇行动第85课:Spark Streaming第四课:基于HDFS的Spark Streaming案例实战和内幕源码解密
- 基于HDFS的SparkStreaming案例实战和内幕源码解析
- 第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密