您的位置:首页 > 其它

Spark 实现黑名单实时过滤

2017-07-02 16:37 429 查看

黑名单实时过滤

一、实验介绍

1.1 实验内容

本节课主要讲解 Spark 的 RDD 操作,让您对 Spark 算子的特性快速了解。通过演示案例实时黑名单过滤,让您切身体会到 RDD 的强大功能,然后学以致用。

1.2 先学课程

Spark 系列课程

1.3 实验知识点

nc
SparkStreaming
Spark RDD

1.4 实验环境

spark-2.1.0-bin-hadoop2.6

Xfce 终端

1.5 适合人群

本课程属于中级难度级别,适合具有 Spark 基础的用户,如果对 Scala 熟悉 ,能够更好的上手本课程。

二、弹性分布式数据集(RDD)

提到 Spark Transformation,不得不说 Spark RDD,那么 RDD 是什么?

弹性分布式数据集(RDD)是 Spark 框架中的核心概念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation 和 action。Spark 将数据存储在不同分区上的 RDD 之中,RDD 可以帮助重新安排计算并优化数据处理过程,此外,它还具有容错性,因为 RDD 知道如何重新创建和重新计算数据集。Transformation 是类似在 RDD上 做 filter()、map()或 union() 以生成另一个 RDD 的操作,而 action 则是 count()、first()、take(n)、collect()
等触发一个计算操作。Transformations 一般都是 lazy 的,直到 action 执行后才会被执行。Spark Master/Driver 会保存 RDD 上的 Transformations。这样一来,如果某个 RDD 丢失,它可以快速和便捷地转换到集群中存活的主机上。这也就是 RDD 的弹性所在。

RDD 支持两种类型的操作:

变换(Transformation)
变换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe 和 coalesce。
行动(Action)
行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。行动操作包括:reduce,collect,count,first,take,countByKey 以及 foreach。

2.1 transformation 操作

map(func):
对调用 map 的 RDD 数据集中的每个 element 都使用 func,然后返回一个新的 RDD,这个返回的数据集是分布式的数据集。

filter(func):
对调用 filter 的 RDD 数据集中的每个元素都使用 func,然后返回一个包含使 func 为 true 的元素构成的 RDD。

flatMap(func):
和 map 差不多,但是 flatMap 生成的是多个结果。
mapPartitions(func):
和 map 很像,但是 map 是每个 element,而 mapPartitions 是每个 partition。

mapPartitionsWithSplit(func):
和 mapPartitions 很像,但是 func 作用的是其中一个 split 上,所以 func 中应该有 index。

sample(withReplacement,faction,seed):
抽样。
union(otherDataset):
返回一个新的 dataset,包含源 dataset 和给定 dataset 的元素的集合。

distinct([numTasks]):
返回一个新的 dataset,这个 dataset 含有的是源 dataset 中的 distinct 的 element。

join(otherDataset,[numTasks]):
当有两个 KV 的 dataset(K,V)和(K,W),返回的是(K,(V,W))的 dataset,numTasks 为并发的任务数。

cogroup(otherDataset,[numTasks]):
当有两个 KV 的 dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的 dataset,numTasks 为并发的任务数。

cartesian(otherDataset):
笛卡尔积简单说就是 m*n。
groupByKey(numTasks):
返回(K,Seq[V]),也就是 hadoop 中 reduce 函数接受的 key-valuelist。

reduceByKey(func,[numTasks]):
就是用一个给定的 reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数。

sortByKey([ascending],[numTasks]):
按照 key 来进行排序,是升序还是降序,ascending 是 boolean 类型。

leftOuterJoin:
leftOuterJoin 类似于 SQL 中的左外关联 left outer join,返回结果以前面的 RDD 为主,关联不上的记录为空。只能用于两个 RDD 之间的关联,如果要多个 RDD 关联,多关联几次即可。

2.2 action 操作

count():
返回的是 dataset 中的 element 的个数。
first():
返回的是 dataset 中的第一个元素。
take(n):
返回前 n 个 elements,这个是driver program 返回的。
takeSample(withReplacement,num,seed):
抽样返回一个 dataset 中的 num 个元素,随机种子 seed。

reduce(func):
说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的。
collect():
一般在 filter 或者足够小的结果的时候,再用 collect 封装返回一个数组。
saveAsTextFile(path):
把 dataset 写到一个 text file 中,或者 hdfs,或者 hdfs 支持的文件系统中,spark 把每条记录都转换为一行记录,然后写到 file 中。

saveAsSequenceFile(path):
只能用在 key-value 对上,然后生成 SequenceFile 写到本地或者 hadoop 文件系统。

countByKey():
返回的是 key 对应的个数的一个 map,作用于一个 RDD。
foreach(func):
对 dataset 中的每个元素都使用 func。
最新的 RDD Operations,请参考:
RDD Operations

三、实验案例之实时黑名单过滤

3.1 案例描述:

在一些企业中,比如小额贷款公司,需要做好风控,对那些信誉不好的用户,我们需要设置黑名单,只要是黑名单中的用户,我们就给过滤掉,禁止提供贷款服务。既然是
实时
,必然用到 SparkStreaming,这里采用
socketTextStream
,结合
nc
命令使用。由于是实验,不方面提供真实数据,这里一再简化,可以简单模拟一份数据,进行测试,原理是相同的。

注意:模拟的数据字段格式为(id user),例如: 00001 张三。


3.2 实验步骤:

1). 双击打开桌面 Xfce 终端,首先使用
nc
启动一个监听端口8888。

由于我们要使用
socketTextStream
传入一个端口及对应的主机名,不先启动端口会报错。

nc -lk 8888




2). 再次双击打开桌面 Xfce 终端,启动 spark-shell。

spark-shell




使用
import
导入依赖。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds




注意:如果是 spark-shell,那么在进入 spark-shell 的时候,spark-shell 自动创建了一个 SparkContext 为sc,那么创建 StreamingContext 只需要用 sc 来 new 就可以了,这样就不会出现多个sc 的冲突问题,否则会报错。


创建StreamingContext,设置每一秒刷新一次。

val ssc = new StreamingContext(sc, Seconds(2))

设置需要过滤的黑名单,这里设置两个名字,您也可设置多个。

val bl = Array(("Jim", true),("hack", true))

设置并行度,这里指定为3。

val blRdd = ssc.sparkContext.parallelize(bl, 3)




设置主机名,端口号。

val st = ssc.socketTextStream("localhost", 8888)

对输入数据进行转换,(id, user) => (user, id user) ,以便对每个批次RDD,与之前定义好的黑名单进行leftOuterJoin操作。

val users = st.map { l => (l.split(" ")(1),l) }




调用左外连接操作leftOuterJoin,进行黑名单匹配,过滤掉。

val validRddDS = users.transform(ld => {
val ljoinRdd = ld.leftOuterJoin(blRdd)
val fRdd = ljoinRdd.filter(tuple => {
if(tuple._2._2.getOrElse(false)) {
false
} else {
true
}
})
val validRdd = fRdd.map(tuple => tuple._2._1)
validRdd
})




#打印白名单
validRddDS.print()
#执行
ssc.start()
#等待完成
ssc.awaitTermination()

此刻,你会看到每隔一秒不断刷新。



接下来,回到第一次打开的 Xfce 终端,即监听 8888 端口。

输入以下内容:

0001 marry
0003 hack
0002 tom




同理继续输入一下内容:

0004 Jim
0005 John
0006 shiro




经过两次的输入,发现对黑名单里的数据都过滤掉,实验完毕。由于设置的刷新间隔太短您可能,看得不是很清楚,可以将间隔设置大一点,方便观察。

四、总结

由于篇幅的原因,本节课主要介绍了Spark 的算子操作,包括
lazy
action
两种,然后基于一个小案例进行加深学习,涉及到 SparkStreaming 的一些操作。更多的 SparkRDD 操作请参考:Spark 官网

五、参考阅读

Spark Streaming实例分析
Spark 开发指南
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐