您的位置:首页 > 其它

transform实现广告计费日志实时黑名单过滤(Scala版本)

2016-11-08 13:39 513 查看
package SparkStreaming

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  * Created by tg on 11/6/16.

  */

object transformDemo {

  def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setAppName("transformDemo")

//                .setMaster("local[2]")

    val ssc=new StreamingContext(conf,Seconds(5))

    //模拟数据,创建黑名单的RDD

    val blackRDD=ssc.sparkContext.parallelize(Array(("tom",true)))

    /**

      * 从nc服务中读取输入的数据

      * 数据格式:time name,比如:1107 leo

      */

    val linesDtream=ssc.socketTextStream("tgmaster",9999)

    /**

      * 通过map算子对输入的数据lines进行操作,形成以下格式的数据:

      * (name,time name)键值对的形式

      */

    val mapDtream=linesDtream.map(line=>{

      val log=line.split(" ")

      (log(1),line)

    })

    /**

      * 通过transform算子进行转换操作

      */

    mapDtream.transform(adsRDD=>{

      //通过leftOuterJoin左外连接操作,操作之后的数据包含所有的用户

      val joinRDD=adsRDD.leftOuterJoin(blackRDD)

      //通过filter算子将黑名单的用户过滤出来

      val filterRDD=joinRDD.filter(item=>{

        if(item._2._2.getOrElse(false)) false else true

      })

      //通过map算子,将不是黑名单的用户日志显示出来

      filterRDD.map(m=>{

        m._2._1

      })

    }).print()

    ssc.start()

    ssc.awaitTermination()

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: