您的位置:首页 > 其它

spark 各组件研究

2016-03-02 19:13 441 查看
TaskScheduler

createTaskScheduler

    case SPARK_REGEX(sparkUrl)

        SparkDeploySchedulerBackend        

            CoarseGrainedExecutorBackend

                case LaunchTask(data) (1)

                       scheduler.initialize(backend)

dagScheduler = new DAGScheduler(this)

taskScheduler.start()
    backend.start()

提交taskset到集群运行

处理shuffle输出的lost

straggle处理机制

对处理每个TaskSet 的TaskSetManager 维护 (精读!)

RDD

    runjob

        dagScheduler.runJob

            submitJob

                eventProcessActor ! JobSubmitted

DAGScheduler

    handleJobSubmitted

        finalStage

        submitStage(finalStage)/runLocally(job)

             def submitStage(stage: Stage) {

                if missing   submitMissingTasks

                        taskSchedulerImpl.submitTasks

                            backend.reviveOffers()

                                makeOffers

                                    launchTasks

                                        executorActor   (1)

                                    executorHost(id, host)

                else submitStage(parent)

              }

    1. taskset 传给底层 的TaskScheduler

    2. 记录那个RDD和Stage被persist

    3.  构建DAG , 决定每个任务的最佳位置

    4.  重新提交shuffle输出丢失的stage 

Executor

    threadPool

    def launchTask

        runningTasks.put(taskId, tr)

            threadPool.execute(tr)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: