您的位置:首页 > 其它

Spark分析之SparkContext启动过程分析

2014-07-05 23:43 381 查看
SparkContext作为整个Spark的入口,不管是spark、sparkstreaming、spark sql都需要首先创建一个SparkContext对象,然后基于这个SparkContext进行后续RDD的操作;所以很有必要了解下SparkContext在初始化时干了什么事情。

SparkContext初始化过程主要干了如下几件事情:

1、根据SparkContext的构造入参SparkConf创建SparkEnv;

2、初始化SparkUI;

3、创建TaskScheduler;

4、创建DAGScheduler;

5、启动taskScheduler;

通过源代码说明SparkContext初始化的过程

1、创建SparkEnv

private[spark] val env = SparkEnv.create(
conf, "<driver>", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt,
isDriver = true, isLocal = isLocal, listenerBus = listenerBus)
SparkEnv.set(env)


2、初始化SparkUI

private[spark] val ui = new SparkUI(this)
ui.bind()


3、创建TaskScheduler:根据spark的运行模式创建不同的SchedulerBackend

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
val SPARK_REGEX = """spark://(.*)""".r

master match {
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend) //为TaskSchedulerImpl中的backend变量初始化
scheduler
}
}

TaskSchedulerImpl extends TaskScheduler{
var backend: SchedulerBackend = null
def initialize(backend: SchedulerBackend) {
this.backend = backend   //将SparkDeploySchedulerBackend赋值给backend变量
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>  //先进先出调度
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>   //公平调度
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
}

private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging {

}


4、创建DAGScheduler:根据TaskScheduler创建DAGScheduler,用于接收提交过来的job

//根据TaskScheduler创建DAGScheduler,产生eventProcssActor(是DAGSchedule的通信载体,能接收和发送很多消息)
@volatile private[spark] var dagScheduler: DAGScheduler = new DAGScheduler(this)
class DAGScheduler{

def this(sc: SparkContext) = this(sc, sc.taskScheduler)

private def initializeEventProcessActor() {
implicit val timeout = Timeout(30 seconds)
val initEventActorReply =  dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
eventProcessActor = Await.result(initEventActorReply, timeout.duration).
asInstanceOf[ActorRef]
}

initializeEventProcessActor()
}

//详细分析见DAGScheduler篇章
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)extends Actor with Logging {{

override def preStart() {
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
}

def receive = {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)
......
}
}


5、启动taskScheduler

启动taskScheduler的主要目的是启动相应的SchedulerBackend,并判断是否进行推测式执行任务;

在启动TaskScheduler的过程中会创建Application并向Master发起注册请求;

taskScheduler.start()

TaskSchedulerImpl extends TaskScheduler{
var backend: SchedulerBackend = null
override def start() {
backend.start()
//spark.speculation...
}
}

private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging {
var client: AppClient = null
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)

override def start() {
super.start()  //调用CoarseGrainedSchedulerBackend的start()方法
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start() 
}
}

class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging
var driverActor: ActorRef = null
override def start() {
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
}

class ClientActor extends Actor with Logging{
override def preStart() {
registerWithMaster()  //向Master注册Application
}
}


CoarseGrainedSchedulerBackend与CoarseGrainedExecutorBackend通信

private[spark] class CoarseGrainedExecutorBackend(driverUrl: String, executorId: String, hostPort: String, cores: Int)
extends Actor with ExecutorBackend with Logging {
var executor: Executor = null
var driver: ActorSelection = null

override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)  //注册Executor,接收方是CoarseGrainedSchedulerBackend
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

override def receive = {
case RegisteredExecutor(sparkProperties)
case LaunchTask(taskDesc)
case KillTask(taskId, _, interruptThread)
case StopExecutor
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: