您的位置:首页 > Web前端 > Node.js

Hadoop源码分析之NameNode的启动与停止

2014-01-07 11:09 627 查看
在前面几篇文章中已经分析完了IPC的调用的大致过程,接下来分析NameNode节点的工作过程

NameNode节点在HDFS文件系统中只有一个实例,它提供了与多个DataNode实例一起组成了HDFS系统。NameNode节点主要功能是维护着HDFS文件中两个重要的关系:

HDFS文件系统的文件目录树,文件的数据块索引(即每个文件对应的数据块列表),
数据块和DataNode节点的对应关系(即一个数据块的多个副本分别保存在哪些DataNode节点上)。
其中HDFS目录树、元信息和数据块索引等信息会持久化到磁盘上,保存在命名空间镜像和编辑日志中。数据块和数据节点的对应关系则在NameNode节点和DataNode节点启动后,由DataNode节点上报给NameNode节点。NameNode节点通过接收数据节点的注册、心跳和数据块提交等信息来管理DataNode节点。NameNode节点会通过发送数据块复制、删除、恢复等NameNode节点指令来操作数据块,此外,NameNode节点还为客户端对文件系统目录树的操作和对文件数据读写以及对HDFS系统进行管理提供了支持。

NameNode的启动

在org.apache.hadoop.hdfs.server.namenode.NameNode.java文件中有个main函数,也就是启动NameNode节点的入口函数。NameNode类的main函数中通过方法createNameNode()创建了一个NameNode对象,然后就调用NameNode.join()函数等待NameNode停止。main()函数代码如下:

public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
NameNode.join()函数就是调用NameNode.server成员变量的join方法,而NameNode.server成员变量是一个Hadoop IPC中讨论的RPC.Server对象,最终就调用了RPC.Server.join()方法。RPC.Server.join()方法也很简单,就是判断Server.runnning变量是否为true,如果running为true,说明这个IPC服务还在运行,就进入等待(wait()函数)。通过上面的分析可以知道,NameNode对象创建成功之后就可以为其他对象提供服务了,直到调用了NameNode的stop()方法,这部分代码比较简单,就不贴出来了。还有一点要注意的是NameNode的成员变量中有两个RPC.Server对象,一个是server,另一个是serviceRpcServer,其中server用于处理客户端的调用,serviceRpcServer用于处理HDFS内部的通信服务,如Datanode节点的调用。

NameNode类的main()方法中,最重要的就是NameNode namenode = createNameNode(argv, null);这行代码,通过这行代码创建了一个NameNode对象。下面以createNameNode()方法为入口来分析NameNode节点的启动过程。createNameNode()函数的代码如下:

public static NameNode createNameNode(String argv[],
Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage();
System.exit(-2);
}
setStartupOption(conf, startOpt);

switch (startOpt) {
case FORMAT:
boolean aborted = format(conf, startOpt.getConfirmationNeeded(),
startOpt.getInteractive());
System.exit(aborted ? 1 : 0);
case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
case RECOVER:
NameNode.doRecovery(startOpt, conf);
return null;
default:
}
DefaultMetricsSystem.initialize("NameNode");
NameNode namenode = new NameNode(conf);
return namenode;
}


在createNameNode()函数中首先获取一个Configuration对象,用于加载启动NameNode节点所需的配置参数。然后就是调用parseArguments()函数来获取运行参数。如果运行参数有误,那么startOpt变量就为null,调用函数printUsage()打印出NameNode可接受的参数,再结束程序,printUsage()函数打印出的内容如下:



在第一次配置Hadoop环境的时候,都执行过hadoop namenode -fromat命令,这里就是使用到了上面的[-format]参数。
parseArguments()函数的代码如下:

private static StartupOption parseArguments(String args[]) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) {
String cmd = args[i];
if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.FORMAT;
// check if there are other options
for (i = i + 1; i < argsLen; i++) {
if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
startOpt.setConfirmationNeeded(false);
}
if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
startOpt.setInteractive(false);
}
}
} else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR;
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.UPGRADE;
} else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
if (startOpt != StartupOption.REGULAR) {
throw new RuntimeException("Can't combine -recover with " +
"other startup options.");
}
startOpt = StartupOption.RECOVER;
while (++i < argsLen) {
if (args[i].equalsIgnoreCase(
StartupOption.FORCE.getName())) {
startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
} else {
throw new RuntimeException("Error parsing recovery options: " +
"can't understand option \"" + args[i] + "\"");
}
}
} else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLBACK;
} else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.FINALIZE;
} else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.IMPORT;
} else
return null;
}
return startOpt;
}
首先默认是将startOpt设置位StartupOption.REGULAR,StartupOption是一个枚举类型,由上面打印的Usage可以看出NameNode启动选项主要有5类,分别是format,upgrade,rollback,finalize和importCheckpoint,它的意义分别是:
format:格式化NameNode,建立NameNode节点的文件结构。带有format参数启动NameNode节点时,首先启动NameNode节点,然后对其机型格式化,再关闭节点,如果文件目录已经存在当前文件系统,则会提示用户。它有两个参数nonInteractive和force,nonInteractive表示如果NameNode节点的文件夹在当前的底层文件系统中存在,那么用户将不会收到提示,并且当前的格式化会失败,force表示不管NameNode的目录存不存在,强制格式化NameNode节点,也不会提示用户,如果nonInteractive和force参数同时存在,那么force参数将会被忽略;
upgrade:升级系统;
rollback:从升级系统中回滚到前一个版本,这个参数必须在停止的集群分布式文件系统中使用;
finalize:提交一次升级,使用这个参数将会删除前一个版本的文件系统,并且当前版本的文件系统将会变成所使用的文件系统,并且rollback参数将不会再有效,这个参数最终会停止NameNode。
importCheckpoint:从名字节点的一个检查点恢复,这个参数将会从检查点目录导入镜像文件,作为NameNode的文件目录结构。
recover:执行元数据恢复,对于这一点还不理解,HDFS已经可以使用importCheckpoint参数从SecondaryNameNode恢复元数据,为什么还需要一个recover的过程?http://stackoverflow.com/questions/17387477/namenode-recovery-how-does-namenode-recovery-works?answertab=active#tab-top这里说的Hadoop2.0开始可以有多个NameNode节点,而这里是Hadoop1.2.1的源码,这一点留待以后再回过头来分析。

在startupOption类中,还有一个枚举值是REGULAR,它表示正常启动NameNode。StartupOption.REGULAR是parseArguments()方法的默认值,从方法中可以看出,方法会遍历程序运行时给出的所有参数,根据参数值设定startOpt的值,如果参数中第一个参数没有NameNode启动所期望的值,那么就返回null,然后就会打印出启动NameNode的方法。

通过parseArguments()方法获取到启动参数之后,就调用私有静态方法将启动方式设置到Congifuration对象中,随后就根据启动参数的值执行相应的方法,这里值针对format,finalize和recover的处理。先不管这些处理,看看NameNode对象的创建过程。DefaultMetricsSystem.initialize("NameNode");这行代码是用于NameNode的运行时的数据统计的,暂时不考虑。下面就是执行NameNode构造方法创建一个NameNode对象。

NameNode的构造方法很简单,直接调用了方法initialize()方法,下面来分析initialize()方法。在这个方法中执行了一系列的成员变量初始化,主要的代码如下(删除了安全相关和度量相关的代码):

private void initializea(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);
//Handler线程的个数
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
this.namesystem = new FSNamesystem(this, conf);
// create rpc server,创建IPC服务器
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
// Set terse exception whose stack trace won't be logged
this.server.addTerseExceptions(SafeModeException.class);

// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);
//启动http服务器
startHttpServer(conf);
this.server.start();  //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
startTrashEmptier(conf);//启动垃圾桶功能
/*可以通过HDFS提供的Service plugin来完成。Service plugin是HDFS提供给管理员的access point,
* 我们可以通过这个access point为HDFS提供一些扩展功能。同样地,创建一个关闭EditLog的plugin。 */
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
}
首先获取RPC地址,读取xml文件中的属性dfs.namenode.rpc-address的值,如果所读到的值为空(不配置默认就为空),则使用fs.default.name属性的值替代,这个过程相关的代码很清晰。再读取属性dfs.namenode.handler.count的值,将其赋值给handlerCount变量,用于配置IPC服务中Handler线程的数量。然后创建FSNamesystem对象。接下来就创建并启动IPC服务器,在NameNode中有两个IPC服务器对象,分别是serviceRpcServer和server。其中如果配置了参数dfs.namenode.servicerpc-address,则会创建serviceRpcServer对象,如果没有配置dfs.namenode.servicerpc-address参数,但是需要创建serviceRpcServer对象,那么就使用dfs.namenode.rpc-address参数创建需要的地址,否则就不创建。serviceRpcServer和server的区别在于,当serviceRpcServer存在时,HDFS内部的IPC调用就使用serviceRpcServer对象来执行,客户端的IPC调用由server对象来执行,如果serviceRpcServer不创建,则所有的IPC调用都由server执行。因为客户端的IPC调用执行时间短,往往需要等待日志的持久化;而数据节点和第二名字节点调用的远程过程,执行逻辑比较复杂,执行时间也比较长。在一个繁忙的系统中,对客户端和HDFS的其他实体的IPC请求使用不同的IPC服务器,可以保证客户端的请求得到即使的响应。

接下来就调用startHttpServer()方法启动HTTP服务器,调用startTrashEmptier()方法启动垃圾桶功能,最后加载为NameNode配置的插件,由于编译HDFS代码之后就不能修改HDFS代码了,如果要给HDFS增加一些其他功能,可以通过HDFS提供的Service plugin来完成。Service plugin是HDFS提供给管理员的access point,所以可以通过这个access point为HDFS提供一些扩展功能(参考自http://langyu.iteye.com/blog/1165292)。

Reference

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》

https://hadoop.apache.org/docs/r1.2.1/commands_manual.html

http://langyu.iteye.com/blog/1165292
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: