您的位置:首页 > 其它

通过Spark Streaming的window操作实战模拟热点搜索词案例实战

2016-08-24 08:05 591 查看
本博文主要内容包括:

1、在线热点搜索词实现解析

2、SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战

一:在线热点搜索词实现解析

背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中,大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。 Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现

我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设置为5秒,那如果需要对1分钟内的数据做统计,该如何实现呢?SparkStreaming中提供了window的概念。我们看下图:



官网给的例子每个2秒钟更新过去3秒钟的内容,3秒钟算一下,5秒钟算一下,3秒钟是一个窗口。window可以包含多个batchInterval(例如5秒),但是必须为batchInterval的整数倍例如1分钟。另外window可以移动,称之为滑动时间间隔,它也是batchInterval的整数倍,例如10秒。一般情况滑动时间间隔小于window的时间长度,否则会丢失数据。

SparkStreaming提供了如下与window相关的方法:



二、SparkStreaming 实现在线热点搜索词实战

1、经过分析我们采用reduceByKeyAndWindow的方法,reduceByKeyAndWindow方法分析如下:

从代码上面来看, 入口为:

reduceByKeyAndWindow(_+_, _-_, Duration, Duration)


一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:

代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值

val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)


先计算oldRDD 和newRDD

//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:

我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值

然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值

// 0秒                  10秒     15秒                25秒
//  _____________________________
// |  previous window   _________|___________________
// |___________________|       current window        |  --------------> Time
//                     |_____________________________|
//
// |________ _________|          |________ _________|
//          |                             |
//          V                             V
//       old RDDs                     new RDDs
//


reduceByWindow(reduceFunc, windowDuration, slideDuration) 代码:

可以看到他做了两次reduce, 第一次对整个self做一次reduce, 然后截取时间区间, 对结果再做一次reduce。

第一点: 对整个self做reduce会比较慢, 因为self都是相对比较大的集合。

第二点:进行了两次reduce ,源码如下:

def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}


如果我们看:

reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

实际上他是调用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 详细计算过程参考之前的博文

这样的话其实他只对newRDDs和oldRDDs做reduce, 由于这两个RDDs都非常小, 可以想象效率是非常高的

def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.map(x => (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
}


如果看reduceByKeyAndWindow的话, 情况也是一样, 一个是执行:

self.reduceByKey(reduceFunc, partitioner)
.window(windowDuration, slideDuration)
.reduceByKey(reduceFunc, partitioner)


而另外一个确是在已有的window值基础上做了简单的加加减减

宗上, 从效率上面考虑, 我们应该尽量使用包含invReduceFunc的方法, 同样情况下摒弃只有reduceFunc的方法

2、我们案例代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by zpf on 2016/8/23.
*/
/**
* 使用Scala并发集群运行的Spark来实现在线热搜词
*
* 背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中
* 大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且
* 每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。
* Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理
* 实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现
*
*/
object OnlineHottestItems {
def main(args: Array[String]){
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者       *
*/
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("OnlineHottestItems") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

/*
* 此处设置 Batch Interval 实在spark Streaming 中生成基本Job的单位,窗口和滑动时间间隔
* 一定是该batch Interval的整数倍*/
val ssc = new StreamingContext(conf, Seconds(5))

val hottestStream = ssc.socketTextStream("Master", 9999)

/*
* 用户搜索的格式简化为 name item,在这里我们由于要计算热点内容,所以只需要提取item即可
* 提取出的item通过map转化为(item,1)形式
* 每隔20秒更新过去60秒的内容窗口60秒,滑动20秒
* */

val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item , 1))
val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20))

hottestDStream.transform(hottestItemRDD => {
val top3 =  hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false).
map(pair => (pair._2,pair._1)).take(3)

for(item <- top3){
println(item)
}
hottestItemRDD
}).print()

ssc.start()
ssc.awaitTermination()

}
}


3、将程序打包运行到集群上观察结果:

4、接下来我们使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 这个函数,来实现增量的计算。

使用这个函数,必须进行Checkpoint。代码如下

ssc.checkpoint("/user/checkpoints/")
val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐