spark core 2.0 YarnClusterManager
2017-01-09 13:59
411 查看
/** * Cluster Manager for creation of Yarn scheduler and backend */ private[spark] class YarnClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = { masterURL == "yarn" } override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } }
/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
logInfo("Created YarnClusterScheduler")
override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
override def stop() {
super.stop()
ApplicationMaster.sparkContextStopped(sc)
}
}
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) } // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) } }
相关文章推荐
- spark core 2.0 YarnClusterSchedulerBackend
- spark core 2.0 DiskBlockManager
- spark core 2.0 BlockManager putBlockData
- spark core 2.0 BlockInfo And BlockInfoManager
- Spark:Yarn-cluster和Yarn-client区别与联系
- spark在yarn上面的运行模型:yarn-cluster和yarn-client两种运行模式:
- spark 2.0 NettyStreamManager -- StreamManager的实现
- spark 2.0 on yarn 问题
- spark 2.1 task allocation on yarn cluster
- Spark Yarn-cluster与Yarn-client
- spark troubleshooting--解决yarn-cluster模式的JVM栈内存溢出问题
- spark core 2.0 Executor ClassLoader
- spark core 2.0 Compression 压缩.
- Spark-2.1.2 Hadoop-2.7.5 spark-submit yarn client cluster两种模式提交
- Spark:Yarn-cluster和Yarn-client区别与联系
- Spark运行模式(local standalond,yarn-client,yarn-cluster,mesos-client,mesos-cluster)
- tensorflow on spark yarn model deploy on CDH5.12 cluster
- spark core 2.0 DiskStore
- spark on yarn 两种运行模式(client 、cluster)对比
- spark core 2.0 Dependency