您的位置:首页 > 其它

spark内核揭秘-07-DAGScheduler源码解读初体验

2015-01-19 20:55 543 查看
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:



进入其构造函数中:









可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:



MapOutputTrackerMaster:



BlockManagerMaster:



通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
// 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null
implicit val timeout = Timeout(30 seconds)
val initEventActorReply =
dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
eventProcessActor = Await.result(initEventActorReply, timeout.duration).
asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
extends Actor with Logging {

override def preStart() {
// set DAGScheduler for taskScheduler to ensure eventProcessActor is always
// valid when the messages arrive
// 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候
// eventProcessActor已经准备就绪
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
}

/**
* The main event loop of the DAG scheduler.
*/
def receive = {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)

case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)

case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)

case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}

override def postStop() {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}


可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: