您的位置:首页 > 其它

第1课:通过案例对SparkStreaming 透彻理解三板斧之一

2016-05-23 11:11 393 查看
spark作为apache旗下顶级项目之一,在2015年火得一塌糊涂,在2016年更是势不可挡,下面两图可见一斑:






对于spark的学习,掌握其API的使用仅仅只是皮毛,我们要深入源码研究其本质,能够做到源码级别的修改和定制,才是真正掌握了它,也才能更好地使用它。从今天起,我们将踏上这一征程。
Spark的子框架有若干, 我们将从Spark Streaming着手切入Spark版本定制,通过对该框架的彻底研究,我们推而广之到spark的各个框架,可以精通Spark力量的源泉和所有问题的解决之道。为什么选择Spark Streaming作为切入点呢?首先是因为数据有时效性,过期的数据就像过期的食物一样,远没有新鲜的食物来的有营养,我们以往选择批处理很多时候是因为技术和资源的限制,做不到流处理,只能退而求其次,从本质上来讲,流处理才是数据处理的王道!现在的时代是流处理的时代。其次,Spark Streaming自从推出以来,收到了越来越多的关注,50%以上的用户都将它视作spark中最重要的部分,如下图可见:




Spark的流式处理可以无缝配合使用SPARK SQL,GraphX与MLlib的功能,这得益于Spark一体化、多元化的基础架构设计,所谓兄弟齐心,其力断金,这正是Spark真正可怕之处,也正是Spark Streaming必将一统天下的根源;Spark Streaming由于其数据输入的动态性,需要动态控制数据的流入,作业的切分,还有数据的处理,所以最容易出问题,需要认真学习仔细掌握; Spark Streaming与其它子框架不同之处在于,它更像是Spark Core之上的一个应用程序,在学过了Spark Core之后,进一步研究Spark Streaming也为我们后续做Spark上的复杂的程序提供了最好的参考。

我们先来看下官网对spark streaming的精要介绍:



接下来进入今天的正题。

一。Spark Streaming另类在线实验

由于Spark Streaming中的数据是动态流入的,对数据的处理也是由框架自动地周期性地产生Job来处理的,在这种动态性变化性下,我们如何清晰地看到数据的流入和被处理的过程?我们的技巧是,通过调大Batch Interval来降低动态变化性,以方便透视细节。需要说明的是,这只是学习研究的需要,实际生产环境中在集群能够处理的情况下,batch interval一般都是越小越好。我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手,首先来看下程序代码:

object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者       *
*/
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到该名称
conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

//*val ssc = new StreamingContext(conf, Seconds(30))
val ssc = new StreamingContext(conf, Seconds(300))
/**
* 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务
* 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能够访问完整的信息
*/
val blackList = Array(("hadoop", true),("mahout", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

val adsClickStream = ssc.socketTextStream("Master", 9999)

/**
* 此处模拟的广告点击的每条数据的格式为:time、name
* 此处map操作的结果是name、(time,name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
//通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

/**
* 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
* 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在值
* 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => {
if(joinedItem._2._2.getOrElse(false))
{
false
} else {
true
}

})

validClicked.map(validClick => {validClick._2._1})
}).print

/**
* 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
*/
ssc.start()
ssc.awaitTermination()

}
}


程序打包好了后,就可以着手我们的测试了。

1.首先做好准备工作,启动hdfs集群(之所以要启动hdfs,是因为我们的spark集群的spark.eventLog.dir 和 spark.history.fs.logDirectory存储在hdfs上);

Start-dfs.sh:



2.启动spark集群,start-all.sh:



3.启动Spark的History Server,方便我们在查看程序运行细节。

Start-history-server.sh:



4.打开数据发送的端口,需要先执行nc,如果直接执行打包的程序,会报错,会报端口被拒绝的错误,因为9999端口确实未启动:

nc -lk 9999

5.用spark-submit运行前面生成的jar包:

/usr/local/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://master:7077 /usr/local/idea/SparkApps.jar

6.在数据发送端口输入若干数据,比如:

111111 Spark

222222 hadoop

333333 flink

444444 kafka

555555 scala

666666 flume

7.程序运行过程中,控制台可见如下打印信息:



可见,黑名单中的hadoop确实被过滤掉了,其余的信息都被正确地显示了。

8.程序运行过程中,通过webui查看运行状态:



9.Ctrl+C手动结束程序后,webui查看运行详情:







初步可见,程序运行在2个worker节点的2个executor中,总共有5个Job。

这跟我们熟知的spark core, spark sql有些不同,程序总共才运行2.7分钟,为啥有5个Job呢?

接下来去具体的Job中看一看。

10. Job 0:



可以发现该Job分为2个stage,运行在所有的2个Executor,这两个stage都是由我们的程序中的66行的代码,即ssc.start()触发的。

接下来看下stage0:



该stage 也是有我们的ssc.start()触发的,共包含50个task,运行在所有的2个executor上。

接下来看下stage1:



该stage 也是有我们的ssc.start()触发的,共包含20个task,运行在所有的2个executor上.

11.Job 1的细节:



该job的Stage2的细节:



该job也是由我们的ssc.start()触发的,只包含1个task,运行在executor0上,本定性为process_local,耗时1.9分钟。该Job实际就是启动了一个接收数据的线程Receiver。

Spark Streaming启动Receiver的时候就会通过Job来启动。而且Receiver只会在一个Executor中执行,且以一个Task去接受我们的数据。Receiver接受数据和普通的Job没有任何区别.

Receiver :长时间(可能7*24小时)运行在Excutor之上,每个Receiver负责一个inuptDStream (比如读取一个kafka消息的输入流)。每个Receiver,加上inputDStream 会占用一个core/slot

重要启示:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。

12.Job 2的详情:



该Job的各个stage对应我们的程序的第40,32和61行( val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) },val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。

我们看Stage3的详情:





该Stage是由worker2上的Executor1执行的,数据写在了worker2的磁盘上(shuffle write).

Stage4:





该stage对应的就是(val blackListRDD = ssc.sparkContext.parallelize(blackList, 8),可以看到在worker1和worker2的executor上都有该rdd的分区数据,且并行度是我们手动指定的8, 数据写在了worker1的磁盘上(shuffle write).

Stage5:





该stage运行在Worker2的executor1上。

可以看出虽然在一台服务器上接受数据,但是会在多台服务器上处理数据。

13.Job3:





Stage6和stage7被skip掉了,因为数据在前一个job执行的时候已经shuffle write在了磁盘上。

Stage8的详情:





该stage运行在Worker1和Worker12的executor上。

同Job2一样,Job3的各个stage对应我们的程序的第40,32和61行(val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) }, val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。

14.Job4:





同Job2,Job3一样,Job4的各个stage对应我们的程序的第40,32和61行( val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) }, val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。

2 瞬间理解Spark Streaming本质

DStream是一个没有边界的集合,没有大小的限制。DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。

时间已固定,我们就锁定到空间的操作。

从空间的维度来讲,就是处理层面。

对DStream的操作,构成了DStreamGraph。如以下图例所示:



上图中每个foreach都会触发一个作业,作业会顺着依赖从后往前回溯,形成DAG,如下图所示:



空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。

最后再去读下官方的Spark Streaming的文档:



总结:在Spark Streaming的应用程序中,框架自动帮我们提交了一些Job,来完成一些事情,从而简化我们的程序逻辑,使我们只需关注在业务逻辑代码上,这正是spark streaming的精华所在,正体现了spark框架的易用性。而要想彻底掌握spark streaming,我们有必要通过这些现象,反过来回溯去寻根问源。接下来几节课我们会逐步深入,抽丝剥茧,去看这些本质。

本次分享来自于王家林老师的课程‘源码版本定制发行班’,在此向王家林老师表示感谢!

欢迎大家交流技术知识!一起学习,共同进步!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源码