spark1.3版本源码解读
2017-12-11 19:47
148 查看
脚本启动流程
1、在主节点启动start-all.sh 调用 start-master.sh 调用 start-daemon.sh org.apache.spark.deploy.master.Master 调用 spark-class (启动后面类的main方法) 2、start-slaves.sh 调用 start-slave.sh spark://hadoop01:7077 3、spark-deamon.sh 调用 spark-class 类名 调用org.apache.spark.launcher.Main
sparksubmit 关键源码
def main(args: Array[String]): Unit = { //将传入的参数封装为sparksubmitArguments对象 val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { printStream.println(appArgs) } //传入的事件类型 一般是submit appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } } submit方法中, 先解析参数 val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 再执行dorunmain方法里面主要是执行 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 在runmain方法中 先通过反射拿到目标类 mainClass = Class.forName(childMainClass, true, loader) 拿到main方法 val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 调用main方法 mainMethod.invoke(null, childArgs.toArray) 然后main方法中我们生成了 sparkconf 和sparkcontext
sparkcontext 任务的人口类
主要做件事 1、创建SparkEnv(内部启动Actor) 2、 3、 4、创建任务调度器TaskScheduler 5、 6、 7、创建DAGScheduler 切分Stage的 8、启动TaskScheduler
实现调用了creatSparkEnv 创建SparkEnv //TODO 该方法创建SparkEnv private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { //通过调用SparkEnv里的静态方法来创建SparkEnv SparkEnv.createDriverEnv(conf, isLocal, listenerBus) } private[spark] val env = createSparkEnv(conf, isLocal, listenerBus) 通过SparkEnv.createDriverEnv(conf, isLocal, listenerBus) 调用的是里面的creat方法 在creat方法中有 AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) 也就是在creat方法中通过AkkaUtils来创建ActorSystem //TODO 创建一个TaskScheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) 在任务调度器中有多种启动模式,跟据传入的sparkurl来匹配模式 //TODO spark的StandAlone模式 case SPARK_REGEX(sparkUrl) => //TODO 创建了一个TaskSchedulerImpl val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) //TODO 创建了一个SparkDeploySchedulerBackend 后端调度器 val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) //TODO 调用initialize创建调度器 scheduler.initialize(backend) (backend, scheduler)
降龙18掌
打包上传任务 /spark-submit --class com.sam...xxx --master spark://node01:7077 /root/x.jar .... Spark任务分配有两种方式-尽量打散(默认),尽量集中 1、调用SparkSubmit类,内部执行submit-->doRunMain-->通过反射拿到程序的主类对象-->执行主类的main方法 2、SparkContext 构建Sparkconf与SparkContext对象,在Sparkcontext类中主要做了3件事, 1、创建SparkEnv对象(创建了ActorSystem对象(创建Actor)) 2、Taskscheduler(用来生成并发送task给Executor) 3、DAGScheduler(用来化分stage) 3、clientActor(driver端创建)将任务信息封装到ApplicationDescription对象,并且提交给Master 4、Master收到clientActor提交的任务信息后,把任务信息存到内存中,然后又将任务信息放到队列中(waitingApps) 5、当开始执行这个任务信息时,调用scheduler方法,进行资源调度 6、将调度好的资源封装到LaunchExecutor,并发送给对应的worker 7、worker接收到Master发送的调度信息(LaunchExecutor)后,将信息封装成一个ExecutorRunner对象 8、封装成ExecutorRunner后,调用它的start方法,开始启动CoarseGrainedExecutorBackend对象, 9、Executor启动后开始向DriverActor进行反向注册 10、与DriverActor注册成功后,创建一个线程池(ThreadPool),用来执行任务 11、当所有的Executor注册完成后,作业环境也就准备好了,Driver端会结束与SparkContext对象的初始化(也就是初始化已完成) 12、当driver初始化完成后 4000 (创建了一个sc实例),会继续执行我们提交的App代码,当触发一个Action的RDD算子时,就会触发一个job,这时就会调用DAGScheduler对象进行Stage划分 13、DAGScheduler 开始进行Stage划分,拿到最后的RDD,从后往前找,调用newstage方法, 14、将划分好的stage按照分区生成一个个的Task,并且封装到TaskSet对象,然后TaskSet提交到TaskScheduler。 15、TaskScheduler 接收到提交的TaskSet,拿到一个序列化器,将Taskset反序列化,将反序列化好的Taskset封装到LaunchExecutor并提交到DriverActor 16、把LaunchExecutor发送到Executor上 通过广播变量 17、Executor接收到DriverActor发送过来的任务(LaunchExecutor),会将其封装成TaskRunner,然后从线程池中获取线程来执行TaskRunner 18、TaskRunner拿到反序列化器,反序列化Taskset,然后执行App代码,也就是对RDD分区上执行的算子和自定义函数
相关文章推荐
- (版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考
- (版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- (版本定制)第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考
- Spark源码的编译过程详细解读(各版本)(博主推荐)
- (版本定制)第8课:Spark Streaming源码解读之RDD生成生命周期彻底研究和思考
- Spark源码的编译过程详细解读(各版本)(博主推荐)
- (版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
- (版本定制)第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
- (版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密
- (版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
- (版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考
- (版本定制)第13课:Spark Streaming源码解读之Driver容错安全性
- (版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考
- 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
- Spark Streaming源码解读之流数据不断接收详解
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究
- 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考