Spark源码分析
2013-11-15 17:13
405 查看
Driver,Master,Worker,Executor
每个App通过Driver使用Cluster:首先向Master注册App,然后每个Worker为其创建一个Executor。
RDD,Partition,Task
每个RDD包含多个Partition,为了计算RDD,在Executor上为RDD的每个Partition创建一个Task负责计算相应的Partition,在一个Executor上可以创建多个Task。
BlockManager
每个Executor使用一个BlockManager。BlockManager通过MemoryStore维护加载到内存的Partition,MemoryStore的存储容量有上限。BlockManager通过DiskStore维护从内存交换到磁盘的Partition。RDD的StorageLevel在Driver端设置,在得到RDD的计算结果后,BlockManager根据RDD的StorageLevel将RDD的Partition加入MemoryStore或DiskStore。
Master的责任
* 接受Worker结点的注册请求。Worker结点在注册请求中指定自身ID、Actor套接字绑定的主机名、Actor套接字监听的端口、可用的CPU核数量、可用的内存容量、WebUI服务监听的端口、外部可用的地址(域名或主机名)。注册Worker后,如果有正在等待资源的App,将触发App调度。
* 接受App的注册请求。App在注册请求中指定App的名称、声明使用的CPU核数量、声明每个Slave(在Worker结点运行的Executor进程)使用的内存容量、用于启动Slave的命令、Spark在Worker结点的安装目径(该路径在所有Worker结点相同)、App的URL、运行App使用的用户名。注册App后,将触发App调度。
* 跟踪Worker结点的活动状态。长时间不向Master发送心跳消息的Worker结点将被Master从其内部数据结构中删除。
* 跟踪App的状态。App结束后,Master释放为App分配的资源(CPU、内存),并尝试调度正在等待的App。
* 登记Executor的状态变更。如果Executor异常终止,将触发对该Executor所属App的调度,使得App任务可以在新的Executor上执行。
Worker的责任
* 根据Master的指示,为App启动Executor进程,通过设置JVM参数限制Executor可以使用的内存量,另外向Executor进程传递一个表示可用CPU核数量的参数,Executor进程并不直接根据该参数限制自身使用的CPU核数量,而是将该参数报告给App,App将根据该参数限制向Executor请求的并发任务的数量。
Executor的责任
* 接受并执行由App请求执行的任务——将指定函数应用到指定数据集(RDD的一个Partition)的各个元素。数据集可能已经被计算,由本地BlockManager或远程Executor的BlockManager缓存在内存或磁盘;数据集也可能尚未被计算,这时就需要调用RDD的compute方法计算数据集,并将结果缓存在本地BlockManager的内存或磁盘。
App的责任
* 记录RDD的生成链和存储级别,并在触发对RDD的Action操作时向Executor请求执行并发任务以完成Action操作。
Task提交过程
所有的Action操作由Driver端的SparkContext.runJob方法触发,对该方法的调用导致向DAGScheduler的事件列队中添加一个JobSubmitted事件。DAGScheduler处理JobSubmitted事件,根据RDD创建TaskSet,将TaskSet传递到ClusterScheduler.submitTasks方法处理。ClusterScheduler.submitTasks接受传入的TaskSet,并为其创建一个TaskSetManager,然后将创建的
TaskSetManager添加进内部调度列表等待调度。对TaskSetManager或者TaskSet的调度可以被动地由StandaloneSchedulerBackend的定时任务触发,也可以在ClusterScheduler.submitTasks 将TaskSetManager添加进内部调度列表后通过调用StandaloneSchedulerBackend.reviveOffers方法主动触发。不管是哪种触发方式,都会导致StandaloneSchedulerBackend.makeOffers 方法被调用,该方法首先将App可用的Executor的资源信息告诉ClusterScheduler,请求将正在等待调度的TaskSet中的Task分配给Executor执行,ClusterScheduler
根据某种调度策略以及各个Executor的资源情况将正在等待调度的TaskSet中的Task分配给Executor,最后在各个Executor上启动为其分配的Task。
每个App通过Driver使用Cluster:首先向Master注册App,然后每个Worker为其创建一个Executor。
RDD,Partition,Task
每个RDD包含多个Partition,为了计算RDD,在Executor上为RDD的每个Partition创建一个Task负责计算相应的Partition,在一个Executor上可以创建多个Task。
BlockManager
每个Executor使用一个BlockManager。BlockManager通过MemoryStore维护加载到内存的Partition,MemoryStore的存储容量有上限。BlockManager通过DiskStore维护从内存交换到磁盘的Partition。RDD的StorageLevel在Driver端设置,在得到RDD的计算结果后,BlockManager根据RDD的StorageLevel将RDD的Partition加入MemoryStore或DiskStore。
Master的责任
* 接受Worker结点的注册请求。Worker结点在注册请求中指定自身ID、Actor套接字绑定的主机名、Actor套接字监听的端口、可用的CPU核数量、可用的内存容量、WebUI服务监听的端口、外部可用的地址(域名或主机名)。注册Worker后,如果有正在等待资源的App,将触发App调度。
* 接受App的注册请求。App在注册请求中指定App的名称、声明使用的CPU核数量、声明每个Slave(在Worker结点运行的Executor进程)使用的内存容量、用于启动Slave的命令、Spark在Worker结点的安装目径(该路径在所有Worker结点相同)、App的URL、运行App使用的用户名。注册App后,将触发App调度。
* 跟踪Worker结点的活动状态。长时间不向Master发送心跳消息的Worker结点将被Master从其内部数据结构中删除。
* 跟踪App的状态。App结束后,Master释放为App分配的资源(CPU、内存),并尝试调度正在等待的App。
* 登记Executor的状态变更。如果Executor异常终止,将触发对该Executor所属App的调度,使得App任务可以在新的Executor上执行。
Worker的责任
* 根据Master的指示,为App启动Executor进程,通过设置JVM参数限制Executor可以使用的内存量,另外向Executor进程传递一个表示可用CPU核数量的参数,Executor进程并不直接根据该参数限制自身使用的CPU核数量,而是将该参数报告给App,App将根据该参数限制向Executor请求的并发任务的数量。
Executor的责任
* 接受并执行由App请求执行的任务——将指定函数应用到指定数据集(RDD的一个Partition)的各个元素。数据集可能已经被计算,由本地BlockManager或远程Executor的BlockManager缓存在内存或磁盘;数据集也可能尚未被计算,这时就需要调用RDD的compute方法计算数据集,并将结果缓存在本地BlockManager的内存或磁盘。
App的责任
* 记录RDD的生成链和存储级别,并在触发对RDD的Action操作时向Executor请求执行并发任务以完成Action操作。
Task提交过程
所有的Action操作由Driver端的SparkContext.runJob方法触发,对该方法的调用导致向DAGScheduler的事件列队中添加一个JobSubmitted事件。DAGScheduler处理JobSubmitted事件,根据RDD创建TaskSet,将TaskSet传递到ClusterScheduler.submitTasks方法处理。ClusterScheduler.submitTasks接受传入的TaskSet,并为其创建一个TaskSetManager,然后将创建的
TaskSetManager添加进内部调度列表等待调度。对TaskSetManager或者TaskSet的调度可以被动地由StandaloneSchedulerBackend的定时任务触发,也可以在ClusterScheduler.submitTasks 将TaskSetManager添加进内部调度列表后通过调用StandaloneSchedulerBackend.reviveOffers方法主动触发。不管是哪种触发方式,都会导致StandaloneSchedulerBackend.makeOffers 方法被调用,该方法首先将App可用的Executor的资源信息告诉ClusterScheduler,请求将正在等待调度的TaskSet中的Task分配给Executor执行,ClusterScheduler
根据某种调度策略以及各个Executor的资源情况将正在等待调度的TaskSet中的Task分配给Executor,最后在各个Executor上启动为其分配的Task。
相关文章推荐
- Spark技术内幕:Stage划分及提交源码分析
- Spark源码分析之Executor分析
- Spark 源码分析 -- task实际执行过程
- spark 1.6.0 core源码分析6 Spark job的提交
- Spark源码分析之-scheduler模块
- 第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query
- spark源码分析只: job 全过程
- 深入理解Spark 2.1 Core (一):RDD的原理与源码分析
- Spark2.2 DAGScheduler源码分析[stage划分算法源码剖析]
- Spark 源码分析 -- RDD
- spark1.2.0源码分析之ShuffleMapTask
- spark core源码分析13 异常情况下的容错保证
- spark源码分析--Master和worker建立连接
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- 【转】Spark源码分析之-scheduler模块
- Spark源码分析之三:Stage划分
- Spark SQL源码分析之核心流程
- Spark源码分析之worker节点启动driver和executor
- Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table
- Spark MLlib LDA 基于GraphX实现原理及源码分析