Spark源码学习2
2015-12-31 17:57
183 查看
转自:/article/5060007.html
在源码阅读时,需要重点把握以下两大主线。
静态view 即 RDD, transformation and action
动态view 即 life of a job, 每一个job又分为多个stage,每一个stage中可以包含多个rdd及其transformation,这些stage又是如何映射成为task被distributed到cluster中
一、概要
本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。
二、实验环境搭建
在进行后续操作前,确保下列条件已满足。
下载spark binary 0.9.1
安装scala
安装sbt
安装java
三、启动spark-shell
"MASTER=local"就是表明当前运行在单机模式
启动master
启动worker
启动spark-shell
注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。
master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示
将spark-env.sh.template重命名为spark-env.sh
修改spark-env.sh,添加如下内容
worker启动完成,连接到master。打开maser的web ui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080
如果一切顺利,将看到下面的提示信息。
可以用浏览器打开localhost:4040来查看如下内容
stages
storage
environment
executors
上述代码统计在README.md中含有Spark的行数有多少
四、部署过程详解
Spark布置环境中组件构成如下图所示。
Driver Program 简要来说在spark-shell中输入的wordcount语句对应于上图的Driver Program.
Cluster Manager 就是对应于上面提到的master,主要起到deploy management的作用
Worker Node 与Master相比,这是slave node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task
Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。
五、JOB的生成和运行
job生成的简单流程如下
首先应用程序创建SparkContext的实例,如实例为sc
利用SparkContext的实例来创建生成RDD
经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD
当action作用于转换之后RDD时,会调用SparkContext的runJob方法
sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处
调用路径大致如下
sc.runJob->dagScheduler.runJob->submitJob
DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor
eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
提交task是调用函数submitMissingTasks来完成
task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
receiveOffers->executor.launchTask->TaskRunner.run
代码片段executor.lauchTask
说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内,运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒
在源码阅读时,需要重点把握以下两大主线。
静态view 即 RDD, transformation and action
动态view 即 life of a job, 每一个job又分为多个stage,每一个stage中可以包含多个rdd及其transformation,这些stage又是如何映射成为task被distributed到cluster中
一、概要
本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。
二、实验环境搭建
在进行后续操作前,确保下列条件已满足。
下载spark binary 0.9.1
安装scala
安装sbt
安装java
三、启动spark-shell
3.1 单机模式运行,即local模式
local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOMEMASTER=local bin/spark-shell
"MASTER=local"就是表明当前运行在单机模式
3.2 local cluster方式运行
local cluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下启动master
启动worker
启动spark-shell
3.3 master
$SPARK_HOME/sbin/start-master.sh
注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。
master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示
3.4 修改配置
进入$SPARK_HOME/conf目录将spark-env.sh.template重命名为spark-env.sh
修改spark-env.sh,添加如下内容
export SPARK_MASTER_IP=localhost export SPARK_LOCAL_IP=localhost
3.5 运行worker
bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M
worker启动完成,连接到master。打开maser的web ui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080
3.6 启动spark-shell
MASTER=spark://localhost:7077 bin/spark-shell
如果一切顺利,将看到下面的提示信息。
Created spark context.. Spark context available as sc.
可以用浏览器打开localhost:4040来查看如下内容
stages
storage
environment
executors
wordcount
上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码scala>sc.textFile("README.md").filter(_.contains("Spark")).count
上述代码统计在README.md中含有Spark的行数有多少
四、部署过程详解
Spark布置环境中组件构成如下图所示。
Driver Program 简要来说在spark-shell中输入的wordcount语句对应于上图的Driver Program.
Cluster Manager 就是对应于上面提到的master,主要起到deploy management的作用
Worker Node 与Master相比,这是slave node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task
Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。
五、JOB的生成和运行
job生成的简单流程如下
首先应用程序创建SparkContext的实例,如实例为sc
利用SparkContext的实例来创建生成RDD
经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD
当action作用于转换之后RDD时,会调用SparkContext的runJob方法
sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处
调用路径大致如下
sc.runJob->dagScheduler.runJob->submitJob
DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor
eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
提交task是调用函数submitMissingTasks来完成
task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
receiveOffers->executor.launchTask->TaskRunner.run
代码片段executor.lauchTask
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内,运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒
相关文章推荐
- 关于某招聘网站简历自动刷新的探索思路
- MYSQL语句大全(SQL 高级教程)【中】
- 课程设计---学生信息管理系统
- IOS_页面跳转
- 使用WCF服务的客户端出现maxReceivedMessageSize异常解决方案
- Linux 命令之mount
- SSH中Hibernate的使用总结
- 写在 12/31/2015
- 使用手机访问本机
- 程序员的十种级别
- MYSQL语句大全(SQL 基础教程)
- IOS开发工具目录
- GDB调试core文件(2)
- 使用trigger方法触发事件 change -model
- MYSQL语句大全(SQL 函数)
- git中文件的三种状态
- 字符串匹配算法比较 http://blog.csdn.net/airfer/article/details/8951802/
- easyui datagrid 注意事项
- 2015个人年度总结
- 单例模式