您的位置:首页 > 其它

spark core 2.0 YarnClusterManager

2017-01-09 13:59 411 查看
/**
* Cluster Manager for creation of Yarn scheduler and backend
*/
private[spark] class YarnClusterManager extends ExternalClusterManager {

override def canCreate(masterURL: String): Boolean = {
masterURL == "yarn"
}

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case  _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
}


/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

logInfo("Created YarnClusterScheduler")

override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

override def stop() {
super.stop()
ApplicationMaster.sparkContextStopped(sc)
}

}

private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
}

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: