第29课 Master HA彻底解密
2016-06-26 17:01
323 查看
4000
1、MasterHA解析
2、Master HA的四种方式
3、Master HA的内部工作机制
4、Master HA的源码解析
实际提交程序的时候,提交给作为Leader的Master;
程序在运行前是粗粒度的资源分配模式,一般一个Master挂掉后,并不影响集群的运行。
一、Master HA解析
1、生产环境下一般采用Zookeeper做HA,且建议为三台Master,Zookeeper会自动化管理Masters的切换;
2、采用Zookeeper做HA的时候,Zookeeper会负责保存整个Spark集群运行的时候的元数据:Workers、Drivers、Executor;
3、Zookeeper遇到当前Active级别的Master出现故障的时候会从StandbyMaster中选取出一台作为Active级别的Master,但是需要注意被选举后到成为真正的ActiveMaster之间需要从Zookeeper中获取集群当前运行状态的元数据信息并进行恢复;
4、在Master切换的过程中,所有的已经在运行的程序皆运行正常!因为SparkApplication在运行前已经通过Cluster
Manager获取了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!
5、在Master切换过程中唯一的影响是不能提交新的Job:一方面是不能够提交新的应用程序给集群,因为只有Active Master才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为因为Active操作触发新的Job的提交请求;
经验之谈:yarn的模式比standalone模式性能低30%左右
二、Master HA的四大方式
1、Master HA的四种方式分别是:Zookeeper、FILESYSTEM(对实时性、延迟性要求没有那么高)、CUSTOM、NONE;
2、需要说明的是:
a)Zookeeper是自动管理Master;
b)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求);
c)CUSTOM等的方式运行用户自定义MasterHA的实现,这对于高级用户特别有用;
d)None,这是默认情况,当我们下载安装了Spark集群中就是采用
[java] view
plain copy
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
<
156df
span style="margin:0px;padding:0px;border:none;color:#000000;background-color:inherit;"> (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
4、PersistenceEngine中有一个至关重要的方法persist来实现数据持久化,readPersistData来恢复获取;
[java] view
plain copy
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
final def readPersistedData(
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
rpcEnv.deserialize { () =>
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}
}
/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)
5、FILESYSTEM和NONE的方式均采用MonarchyElectionAgent的方式来完成Leader选举
[java] view
plain copy
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
6、NONE根本不需要持久化,为什么写了BlockHolePersistenceEngine,里面啥都没实现?代码结构统一,且易扩展;
private[master] class BlackHolePersistenceEngine extends PersistenceEngine {
override def persist(name: String, obj: Object): Unit = {}
override def unpersist(name: String): Unit = {}
override def read[T: ClassTag](name: String): Seq[T] = Nil
}
==========Master HA的内部工作机制(主要Zookeeper)============
1、Zookeeper自动从Standby Master里面选取出作为Leader的Master;
2、使用ZookeeprPersistEngine去读取集群的状态数据Workers、Drivers、Applications、Executors 等信息;
3、判断元数据信息是否有空的内容;
4、把通过Zookeeper持久化引擎获得的Workers、Drivers、Applications、Executors 等信息重新注册到Master的内存中缓存起来;
5、验证获得的信息和当前正在运行的集群的状态的一致性;
6、将Applications和Workers的状态标识为UNKOWN,然后会向Application中的Driver以及Worker发送现在是Leader的standby模式的Master的地址信息;
7、当Drivers以及Workers收到新的Master地址信息后,会响应改信息;
8、Master接收到来自Drviers和Workers的响应信息后,会使用一个关键的方法completeRecovery,对没有响应的Applications(Drivers)、Workers(Executors)进行处理,Master的state会变成RecoveryState.ALIVE ,从而可以开始对外服务
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
d.desc.supervise此种方式在Drvier失败后重启Drvier
9、(关键一步)此时Master调用自己的scheduler方法对正在等待的Applications和Drviers进行资源调度!!!
DT大数据梦工厂
新浪微博:www.weibo.com/ilovepains/
1、MasterHA解析
2、Master HA的四种方式
3、Master HA的内部工作机制
4、Master HA的源码解析
实际提交程序的时候,提交给作为Leader的Master;
程序在运行前是粗粒度的资源分配模式,一般一个Master挂掉后,并不影响集群的运行。
一、Master HA解析
1、生产环境下一般采用Zookeeper做HA,且建议为三台Master,Zookeeper会自动化管理Masters的切换;
2、采用Zookeeper做HA的时候,Zookeeper会负责保存整个Spark集群运行的时候的元数据:Workers、Drivers、Executor;
3、Zookeeper遇到当前Active级别的Master出现故障的时候会从StandbyMaster中选取出一台作为Active级别的Master,但是需要注意被选举后到成为真正的ActiveMaster之间需要从Zookeeper中获取集群当前运行状态的元数据信息并进行恢复;
4、在Master切换的过程中,所有的已经在运行的程序皆运行正常!因为SparkApplication在运行前已经通过Cluster
Manager获取了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!
5、在Master切换过程中唯一的影响是不能提交新的Job:一方面是不能够提交新的应用程序给集群,因为只有Active Master才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为因为Active操作触发新的Job的提交请求;
经验之谈:yarn的模式比standalone模式性能低30%左右
二、Master HA的四大方式
1、Master HA的四种方式分别是:Zookeeper、FILESYSTEM(对实时性、延迟性要求没有那么高)、CUSTOM、NONE;
2、需要说明的是:
a)Zookeeper是自动管理Master;
b)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求);
c)CUSTOM等的方式运行用户自定义MasterHA的实现,这对于高级用户特别有用;
d)None,这是默认情况,当我们下载安装了Spark集群中就是采用
[java] view
plain copy
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
<
156df
span style="margin:0px;padding:0px;border:none;color:#000000;background-color:inherit;"> (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
4、PersistenceEngine中有一个至关重要的方法persist来实现数据持久化,readPersistData来恢复获取;
[java] view
plain copy
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
final def readPersistedData(
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
rpcEnv.deserialize { () =>
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}
}
/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)
5、FILESYSTEM和NONE的方式均采用MonarchyElectionAgent的方式来完成Leader选举
[java] view
plain copy
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
6、NONE根本不需要持久化,为什么写了BlockHolePersistenceEngine,里面啥都没实现?代码结构统一,且易扩展;
private[master] class BlackHolePersistenceEngine extends PersistenceEngine {
override def persist(name: String, obj: Object): Unit = {}
override def unpersist(name: String): Unit = {}
override def read[T: ClassTag](name: String): Seq[T] = Nil
}
==========Master HA的内部工作机制(主要Zookeeper)============
1、Zookeeper自动从Standby Master里面选取出作为Leader的Master;
2、使用ZookeeprPersistEngine去读取集群的状态数据Workers、Drivers、Applications、Executors 等信息;
3、判断元数据信息是否有空的内容;
4、把通过Zookeeper持久化引擎获得的Workers、Drivers、Applications、Executors 等信息重新注册到Master的内存中缓存起来;
5、验证获得的信息和当前正在运行的集群的状态的一致性;
6、将Applications和Workers的状态标识为UNKOWN,然后会向Application中的Driver以及Worker发送现在是Leader的standby模式的Master的地址信息;
7、当Drivers以及Workers收到新的Master地址信息后,会响应改信息;
8、Master接收到来自Drviers和Workers的响应信息后,会使用一个关键的方法completeRecovery,对没有响应的Applications(Drivers)、Workers(Executors)进行处理,Master的state会变成RecoveryState.ALIVE ,从而可以开始对外服务
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
d.desc.supervise此种方式在Drvier失败后重启Drvier
9、(关键一步)此时Master调用自己的scheduler方法对正在等待的Applications和Drviers进行资源调度!!!
DT大数据梦工厂
新浪微博:www.weibo.com/ilovepains/
相关文章推荐
- C++多态
- Leetcode-integer-to-roman
- noi2010能量采集(在网格点中互质的特点)
- coreseek 安装记录
- Python 位运算及二进制基础知识
- 浮动元素float的详细内幕
- Sphinx 安装记录
- go example之旅(上)
- Socket
- 实参求值的副作用
- 打卡阅读
- coreeek 和 sphinx 的配置与使用
- Socket编程
- Chapter 1. WinForm 概述(属性、事件、控件)
- NavigationView使用时遇到的问题
- 原始套接字编程(1)
- 长逻辑运算符和短逻辑运算符
- 3. ioctl
- 软件开发者如何准备未来?
- javascript异步编程