NodeManager代码分析之NodeManager启动过程
2017-09-25 01:11
501 查看
http://blog.csdn.net/wuwenxiang91322/article/details/40384471
NodeManager整体架构:
创建NodeManager对象;
加载配置文件初始化配置项;
调用initAndStartNodeManager函数。
主要代码:
[java] view
plain copy
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
//打印Nodemangager启动和关闭时的日志信息
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
//创建NodeManager对象
NodeManager nodeManager = new NodeManager();
//加载配置文件初始化
Configuration conf = new YarnConfiguration();
//初始化并启动NodeManager
nodeManager.initAndStartNodeManager(conf, false);
}
启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
[java] view
plain copy
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
//增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
//调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
this.init(conf);
//启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
AbstractService抽象类中init方法实现:
[java] view
plain copy
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
进行基本的配置操作,例如从配置文件读入参数等
创建并添加DeletionService、NodeHealthCheckerService到父类的一个ArrayList<Service>对象serviceList中
调用createNodeStatusUpdater()函数创建NodeStatusUpdater对象,再调用NodeManager的父类方法register将这个对象注册添加父类的ArrayList<Service>对象listeners中.
调用createContainerManager()、createWebServer()函数和createNodeResourceMonitor函数创建ContainerManagerImpl对象、Service对象和NodeResourceMonitor对象,并将其添加到serviceList
将ContainerManagerImpl注册到之前创建好的异步事件调度器AsyncDispatcher中,然后将将AsyncDispatcher调度器添加到服务队列serviceList中
最后添加NodeStatusUpdater对象到serviceList中,最后添加这个对象的原因是要在心跳操作启动必须在所有其他服务之后。
调用父类的init()函数,进行其他的配置初始化。
主要代码:
[java] view
plain copy
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
this.aclsManager = new ApplicationACLsManager(conf);
//始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
//包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
//决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
try {
exec.init();
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher 异步分发器
this.dispatcher = new AsyncDispatcher();
//可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
//创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态).
//这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
// 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started)
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
//创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议
containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
// 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
//初始化监控
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
super.serviceInit(conf);
// TODO add local dirs to del
}
调用父类的start()函数依次启动在NodeManager类serviceInit中所有添加好的服务。其中AsyncDispatcher负责事件的传送,NodeStatusUpdater负责产生心跳事件,ContainerManagerImpl负责提供Hadoop RPC需要的函数等等
AbstractService中start方法的具体实现:
[java] view
plain copy
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
//会调用子类NN中重写的同名方法
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
NodeManager中重写的serviceStart方法的主要代码:
[java] view
plain copy
@Override
protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
super.serviceStart();
}
http://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
1、NodeManager概述
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。NodeManager整体架构:
2、NodeManager分析
接下来将按照启动NodeManager时代码执行的顺序为主线进行代码分析。2.1 main函数
打印NodeManager启动和关闭日志信息;创建NodeManager对象;
加载配置文件初始化配置项;
调用initAndStartNodeManager函数。
主要代码:
[java] view
plain copy
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
//打印Nodemangager启动和关闭时的日志信息
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
//创建NodeManager对象
NodeManager nodeManager = new NodeManager();
//加载配置文件初始化
Configuration conf = new YarnConfiguration();
//初始化并启动NodeManager
nodeManager.initAndStartNodeManager(conf, false);
}
2.2 initAndStartNodeManager函数
调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
[java] view
plain copy
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
//增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
//调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
this.init(conf);
//启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
2.3init函数
(1)init方法是从Service接口,在AbstractService抽象类中得到实现。在AbstractService类中的init方法调用protected 类型的serviceInit。在其子类NodeMananger中重写了serviceInit方法。AbstractService抽象类中init方法实现:
[java] view
plain copy
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
进行基本的配置操作,例如从配置文件读入参数等
创建并添加DeletionService、NodeHealthCheckerService到父类的一个ArrayList<Service>对象serviceList中
调用createNodeStatusUpdater()函数创建NodeStatusUpdater对象,再调用NodeManager的父类方法register将这个对象注册添加父类的ArrayList<Service>对象listeners中.
调用createContainerManager()、createWebServer()函数和createNodeResourceMonitor函数创建ContainerManagerImpl对象、Service对象和NodeResourceMonitor对象,并将其添加到serviceList
将ContainerManagerImpl注册到之前创建好的异步事件调度器AsyncDispatcher中,然后将将AsyncDispatcher调度器添加到服务队列serviceList中
最后添加NodeStatusUpdater对象到serviceList中,最后添加这个对象的原因是要在心跳操作启动必须在所有其他服务之后。
调用父类的init()函数,进行其他的配置初始化。
主要代码:
[java] view
plain copy
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
this.aclsManager = new ApplicationACLsManager(conf);
//始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
//包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
//决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
try {
exec.init();
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher 异步分发器
this.dispatcher = new AsyncDispatcher();
//可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
//创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态).
//这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
// 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started)
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
//创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议
containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
// 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
//初始化监控
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
super.serviceInit(conf);
// TODO add local dirs to del
}
2.4 start函数
进行安全认证操作调用父类的start()函数依次启动在NodeManager类serviceInit中所有添加好的服务。其中AsyncDispatcher负责事件的传送,NodeStatusUpdater负责产生心跳事件,ContainerManagerImpl负责提供Hadoop RPC需要的函数等等
AbstractService中start方法的具体实现:
[java] view
plain copy
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
//会调用子类NN中重写的同名方法
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
NodeManager中重写的serviceStart方法的主要代码:
[java] view
plain copy
@Override
protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
super.serviceStart();
}
3、参考资料:
http://www.technology-mania.com/2014/05/an-insight-into-hadoop-yarn-nodemanager.htmlhttp://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
相关文章推荐
- NodeManager代码分析之NodeManager启动过程
- STM32启动过程相关代码分析
- PackageManagerService的启动过程分析
- linux非解压代码的启动过程分析 unicore head.S vmlinux解压后的代码运行 临时MMU的建立
- PackageManagerService的启动过程分析
- Linux启动过程简略分析-start_kernel部分代码阅读
- PackageManagerService的启动过程分析
- Hadoop 2.0 Yarn代码:NodeManager端代码分析_NM端各服务模块的启动
- ActivityManagerService启动过程分析
- HDFS之DataNode启动过程分析
- Tomcat 5.5.26源代码分析——启动过程(一)
- ActivityManagerService启动过程分析
- Tomcat 5.5.26源代码分析——启动过程(二)
- 对 S3C2410 启动代码内数据复制过程的分析
- Hadoop 2.0 Yarn代码:NodeManager端代码分析_NM端各服务模块的启动
- u-boot启动过程分析——基于lpc2210的移植代码
- APK的安装过程分析(PackageManagerService启动过程)
- Hadoop 2.0 Yarn代码:NodeManager端代码分析_NM端各服务模块的启动
- PackageManagerService的启动过程分析
- Android系统启动过程分析代码跟踪。