您的位置:首页 > 编程语言

spark学习-66-源代码:schedulerBackend和taskScheduler的创建(4)-yarn

2017-12-26 21:43 621 查看

1。在下面代码中,指定了yarn模式运行,但是它是怎么调度的呢?

PARK_HOME/bin/spark-submit --name "lcc_sparkSql_check" --master yarn --class HbaseDataCheck.HbaseDataCheck /home/lcc/hbaseCount/SparkOnHbaseScala.jar


2.研究代码发现masterUrl 匹配所有里面才是使用外部的调度器

是什么就创建什么类型的外部调度器

/**
// 这里匹配任意值,比如Yarn,mose,m3之类的其他资源管理器,这里以Yarn进行讲解
*/
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
// 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类,
// 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}


主要看看这三行代码

// 一个集群管理器接口,用于插件外部调度器。 ExternalClusterManager,这个是一个特质,因此看它的继承类或者实现类,
// 假设是Yarn模式,那么就看YarnClusterManager,然后我们去看看YarnClusterManager
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)


这里发现createTaskScheduler和createSchedulerBackend调用的都是接口,这是一个集群管理器接口,用于插件外部调度器。

/**
* A cluster manager interface to plugin external scheduler.
* 一个集群管理器接口,用于插件外部调度器。
*/
private[spark] trait ExternalClusterManager {

/**
* Check if this cluster manager instance can create scheduler components
* for a certain master URL.
* @param masterURL the master URL
* @return True if the cluster manager can create scheduler backend/
*/
def canCre
11fc6
ate(masterURL: String): Boolean

/**
* Create a task scheduler instance for the given SparkContext
* @param sc SparkContext
* @param masterURL the master URL
* @return TaskScheduler that will be responsible for task handling
*/
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

/**
* Create a scheduler backend for the given SparkContext and scheduler. This is
* called after task scheduler is created using `ExternalClusterManager.createTaskScheduler()`.
* @param sc SparkContext
* @param masterURL the master URL
* @param scheduler TaskScheduler that will be used with the scheduler backend.
* @return SchedulerBackend that works with a TaskScheduler
*/
def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend

/**
* Initialize task scheduler and backend scheduler. This is called after the
* scheduler components are created
* @param scheduler TaskScheduler that will be responsible for task handling
* @param backend SchedulerBackend that works with a TaskScheduler
*/
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}


因为指定的是yarn模式,因此看yarn的实现继承类YarnClusterManager

/**
* Cluster Manager for creation of Yarn scheduler and backend
* 集群管理器,用于创建Yarn scheduler调度器和backend
*/
private[spark] class YarnClusterManager extends ExternalClusterManager {

// 首先在SparkContexty中调用createTaskScheduler方法,因为不知道调用的是谁啊,所以都发给包括Yarn,mose,m3等,这个方法就是判断,是不是创建我啊
override def canCreate(masterURL: String): Boolean = {
masterURL == "yarn"
}

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
// 如果是Yarn-cluster
case "cluster" => new YarnClusterScheduler(sc)
// 如果是Yarn-client
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case  _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
}


这里yarn模式又分为yarn-cluster和yarn-client,这里先看看yarn-cluster

// 如果是Yarn-cluster
case "cluster" => new YarnClusterScheduler(sc)


YarnClusterScheduler没看懂,先放在这里

/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

logInfo("Created YarnClusterScheduler")

override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

}


然后创建的YarnClusterSchedulerBackend

case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)


YarnClusterSchedulerBackend也是没看懂

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {

override def start() {
val attemptId = ApplicationMaster.getAttemptId
bindToYarn(attemptId.getApplicationId(), Some(attemptId))
super.start()
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}

override def getDriverLogUrls: Option[Map[String, String]] = {
var driverLogs: Option[Map[String, String]] = None
try {
val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
val containerId = YarnSparkHadoopUtil.get.getContainerId

val httpAddress = System.getenv(Environment.NM_HOST.name()) +
":" + System.getenv(Environment.NM_HTTP_PORT.name())
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = yarnConf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val user = Utils.getCurrentUserName()
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
logDebug(s"Base URL for logs: $baseUrl")
driverLogs = Some(Map(
"stdout" -> s"$baseUrl/stdout?start=-4096",
"stderr" -> s"$baseUrl/stderr?start=-4096"))
} catch {
case e: Exception =>
logInfo("Error while building AM log links, so AM" +
" logs link will not appear in application UI", e)
}
driverLogs
}
}


然后如果是yarn-client模式

// 如果是Yarn-client
case "client" => new YarnScheduler(sc)


YarnScheduler里面自己看

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
}

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}


创建的YarnClientSchedulerBackend是主要的处理类

case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)


主要看YarnClientSchedulerBackend类的start方法

/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*
* 创建一个Yarn客户端,向ResourceManager提交一个应用程序。这将等待应用程序运行。
*/
override def start() {
val driverHost = conf.get("spark.driver.host") // 获取driver的IP地址
val driverPort = conf.get("spark.driver.port") // 获取driver的端口号
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }  // 为对应UI地址绑定ui对象

val argsArrayBuf = new ArrayBuffer[String]() // 获取启动参数
argsArrayBuf += ("--arg", hostport)

logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray)
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) // 获取启动时指定的Executor个数
client = new Client(args, conf)  // 生成driver端的client
bindToYarn(client.submitApplication(), None)  // 通过client提交application

// SPARK-8687: Ensure all necessary properties have already been set before
// we initialize our driver scheduler backend, which serves these properties
// to the executors
super.start()  // 最终调用了CoarseGrainedSchedulerBackend中的start方法
waitForApplication() // 等待Application开始运行

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
}
monitorThread = asyncMonitorApplication()
monitorThread.start()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐