您的位置:首页 > 其它

spark源码学习(一):sparkContext的初始化分析(一)

2016-02-25 12:49 585 查看
spark源码学习:sparkContext的初始化分析

spark可以运行在本地模式local下,可以运行在yarn和standalone模式下,但是本地程序是通过什么渠道和这些集群交互的呢?那就是sparkContext,他在spark生态系统中的作用不言而喻,绝对是最重要的,整体架构如图所示:



这里我们简单的来剖析一下,sparkContext在初始化最重要的流程和大致框架。spark代码第一句都是先创建sparkConf之后作为参数传递给sparkContext在进行创建sc,之后的一切操作都离不开sparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
sc=new SparkContext(conf)

在debug模式下进入源代码。



可以看到这里的sparkContext使用sparkConf作为参数来进行传递参数。变量allowMultipleContext是用来决定是否在spark中指运行一个任务,markPartiallyContructed来确保实例的唯一性。



上面是对sparkConf进行复制之后,然后对配置信息进行校验。上面的代码显示spark运行必须指定master和name,否则就会抛出异常,结束初始化过程,在后面我们会看到master是用来设置部署模式,name用来指定程序名称,相对简单。

下面重要的是sparkEnv的初始化,sparkEnv是spark的执行环境对象,包括很多与Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者standalone部署模式下worker另起CoarseGrainedExecutorBackend进程中创建executor,继而创建taskRunner方法,运行runtask运行任务,这个方法会有自己的具体实现类,shufflemaptask和resulttask有具体的实现。所以sparkEnv存在于driver或者CoarseGrainedExecutorBackend进程中。代码如下:



我们进入createDriverEnv方法,它隶属于SparkEnv的方法。主要保存spark运行时环境变量。这里阐述最重要的变量初始化,至于什么页面监控之类的就不看了。

(1) Akka的分布式消息系统actorSystem的初始化。

(2) 创建map任务输出跟踪器mapoutputTracker,主要就是跟踪map任务把数据结果写到哪里去了,reduce也可以去取数据map,reduce,shuffle都有自己所对应的ID,着重介绍一下MapOutputTrackerMaxter,它内部使用mapStatuses来维护跟中map任务的输出状态,这个数据结构是一个map,其中key对应shufleID,value存储各个map任务对应的状态信息mapStatus。由于mapStatus是由blockmanagerid(key)和bytesSize(value)组成,key是表示这些计算的中间结果存放在那个blockManager,value表示不同的reduceID要读取的数据大小这时reduce就知道从哪里fetch数据,并且判断数据的大小(和0比较来确保确实获得了数据)。

driver和executor处理mapOutputtTrackermMaster的方式不同:

Driver:创建mapOutputtTrackermMaster,然后创建mapOutputtTrackermMasterActor,并且注册到ActorSystem.

Executor:创建mapOutputtTrackerm,并从ActorSystem中找到mapOutputtTrackermMasterActor。

有那么多的executor,当然就有跟多的mapTask,那driver是怎样知道各个mapTask的执行任务信息呢?那就靠我们上面的mapOutputtTrackermMasterActor啦,哈哈。map任务的状态就是有Executor向持有mapOutputtTracker-MasterActor来发送消息,把map任务状态同步到mapOutputtTracker的mapstatuses上去。问题又来了,executor怎样找到mapOutputtTrackermMasterActor呢?那就靠registe'OrLookup函数啦,它后台使用ActorSystem提供的分布式消息机制实现的。

那就来看看具体的代码吧:



(3)实例化ShuffleManager。它主要是负责管理本地和远程的block的shuffle操作。ShuffleManager通过反射机制来生成默认的SortShuffleManager。可以通过修改spark.shuffle.manager设置为hash来显示的控制使用HashShuffle-Manager。这里的sort主要是指在shuffle中的key-value,key是默认排序好的。具体代码如下:



(4)块儿传输服务BlockTransferServervice。这里默认的是NettyBlockTransferServervice。可以通过配置属性spark.shuffle.blockTransferService使用NioBlockTransferService。NettyBlockTransferServervice使用Netty提供的异步事件驱动网络应用框架,提供web服务及客户端,获取远程节点上的Block集合。代码如下:



(5)创建BlockManagerMaster。它是负责block的管理和协调,具体的操作是依赖于BlockManagerMaster-Actor,因为它需要与Executor上的BlockManager通过Actor进行通信。(这里的Endpoint相当于Actor)



(6) 创建BlockManager。他主要运行在worker节点上。虽然这里创建了,但是只有在他的init初始化函数之后才是有效的。


(7)创建广播管理器BroadcastManager。主要是负责把序列化时候的RDD,job以及shuffleDependence等,以及配置信息存储在本地,有时候还会存储到其他的节点以保持可靠性。

(8) 创建存储管理器CacheManager。他主要用于缓存RDD某个分区计算的中间结果,缓存计算结果在迭代计算的时候发生。他很有用,可以减少磁盘IO,加快执行速度。

(9) HTTP文件服务器httpFileServer。退工对jar以及其他文件的http访问。例如jar包的上传等等。他的端口号由spark.fileserver.port来配置,默认情况下是0,表示随机生成端口号。

(10)最后创建测量系统MetricsSystem,是spark的测量系统,不用深究,这里我们不管他。

上面的10条主要说的是sparkEnv的创建,执行环境也就刚刚初始化完毕,核心的核心还没开始呢。

具体的变量创建等等下篇我们再来介绍。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: