Spark-Streaming之transform操作,实时黑名单过滤案例
2017-07-24 15:47
567 查看
Transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。
DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。
案例:
DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。
案例:
object TransformDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val config = new SparkConf().setAppName("TransformDemo").setMaster("local[2]") val ssc = new StreamingContext(config, Seconds(2)) //定义黑名单数组 val blackList = Array(("tom", true), ("jim", true)) //黑名单RDD val blackListRDD = ssc.sparkContext.parallelize(blackList) //定义一个socket输入流 ssc.socketTextStream("hadoop01", 8888).map(line => { val fields = line.split(" ") val name = fields(0) val clickDate = fields(1) (name, clickDate) }).transform(rdd => { //进行黑名单过滤,拿到发过来的数据和黑名单数据进行join连接 //(tom,2017-03-02) leftOuterJoin (tom,true) ===> (tom,(2017-03-02,Some(true))) rdd.leftOuterJoin(blackListRDD).filter(tuple => { //过滤掉黑名单里面的数据,isEmpty判断是否为null,如果是null返回true //过滤出这样的数据(jom,(2017-09-09,None)) if (tuple._2._2.isEmpty) true else false }) }).print() ssc.start() ssc.awaitTermination() } }
相关文章推荐
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 第106课: Spark Streaming电商广告点击综合案例黑名单过滤实现
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 第108课: Spark Streaming电商广告点击综合案例动态黑名单过滤真正的实现代码
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战
- 12.transform以及实时黑名单过滤案例实战