您的位置:首页 > 其它

Spark-Streaming之transform操作,实时黑名单过滤案例

2017-07-24 15:47 567 查看
Transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。
DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

案例:
object TransformDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val config = new SparkConf().setAppName("TransformDemo").setMaster("local[2]")
val ssc = new StreamingContext(config, Seconds(2))
//定义黑名单数组
val blackList = Array(("tom", true), ("jim", true))
//黑名单RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
//定义一个socket输入流
ssc.socketTextStream("hadoop01", 8888).map(line => {
val fields = line.split(" ")
val name = fields(0)
val clickDate = fields(1)
(name, clickDate)
}).transform(rdd => {
//进行黑名单过滤,拿到发过来的数据和黑名单数据进行join连接
//(tom,2017-03-02) leftOuterJoin (tom,true)  ===> (tom,(2017-03-02,Some(true)))
rdd.leftOuterJoin(blackListRDD).filter(tuple => {
//过滤掉黑名单里面的数据,isEmpty判断是否为null,如果是null返回true
//过滤出这样的数据(jom,(2017-09-09,None))
if (tuple._2._2.isEmpty) true else false
})
}).print()
ssc.start()
ssc.awaitTermination()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: