您的位置:首页 > Web前端 > Node.js

handoop0.20.2:名字节点namenode的启动

2017-03-26 17:30 274 查看
注:分析到的主要代码在

org.apache.hadoop.hdfs.server.namenode.NameNode和org.apache.hadoop.hdfs.server.namenode.FSNamesystem中

1.NameNode.main()是名字节点启动的入口,主要就是通过createNameNode方法创建一个namenode对象,创建成功后再等待它执行结束(namenode.join())

public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null);
if (namenode != null)
namenode.join();
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
2.createNamenode()方法实际都做了什么?

 -根据argv参数找到启动选项startOpt

 -根据startOpt设置配置文件

 -setStartupOption(conf, startOpt),根据startOpt的值选择不同的启动处理方式,这里大概是除了FORMAT和FINALIZE选项,其他的选项都将执行->

 -执行namenode构造方法,根据conf,new一个namenode并返回,在NameNode.main中接收

public static NameNode createNameNode(String argv[],
Configuration conf) throws IOException {
if (conf == null)
conf = new Configuration();
StartupOption startOpt = parseArguments(argv);//根据argv启动选项
if (startOpt == null) {
printUsage();
return null;
}
setStartupOption(conf, startOpt);

switch (startOpt) {
case FORMAT:
boolean aborted = format(conf, true);
System.exit(aborted ? 1 : 0);
case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
default:
}
NameNode namenode = new NameNode(conf);
return namenode;
}
3.new Namenode(conf)是Namenode的构造方法,它都做了些什么?

可以看到,它就是执行了一个初始化initialize(conf)方法。

public NameNode(Configuration conf) throws IOException {
try {
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
}
}
4.再来看一下initialize(conf)

初始化方法看起来相对麻烦一点,但实际上大量的具体工作交由了FSNamesystem完成。

 -创建ipc服务器

 -FSNamesystem构造方法完成了许多工作

 -开启httpserver;开启RPC server等

private void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);//ipc服务器地址
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
//安全相关初始化
 if (serviceAuthEnabled =
conf.getBoolean(
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
PolicyProvider policyProvider =
(PolicyProvider)(ReflectionUtils.newInstance(
conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
HDFSPolicyProvider.class, PolicyProvider.class),
conf));
SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
}

// create rpc server 创建ipc服务器
this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
handlerCount, false, conf);

// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress(); //监听地址
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);

myMetrics = new NameNodeMetrics(conf, this);

this.namesystem = new FSNamesystem(this, conf);//FSNamesystem完成了许多工作
startHttpServer(conf);//启动http服务器
this.server.start();  //start RPC server
startTrashEmptier(conf);//启动垃圾桶功能
}
5.new FSNamesystem(this,conf)时,和new Namenode()一样,其实主要就是FSNamesystem.initialize,它主要完成了:

 -通过FSDirectory.loadFSImage,加载命名空间镜像,应用编辑日志

 -创建安全模式管理对象SafeModeInfo,调用setBlockTotal()对SafeModeInfo.blockTotal进行更新。

 -创建和启动心跳检查/租约检查/数据块复制删除线程;

 -读入数据节点的include和exclude文件,创建并启动数据节点撤销检查线程等等。。

这是我看到的主要工作,有的深入下去我也不太懂。。。

FSNamesystem(NameNode nn, Configuration conf) throws IOException {
try {
initialize(nn, conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
throw e;
}
}


private void initialize(NameNode nn, Configuration conf) throws IOException {
this.systemStart = now();
setConfigurationParameters(conf);

this.nameNodeAddress = nn.getNameNodeAddress();
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
this.dir = new FSDirectory(this, conf);//创建 FSDirectory对象 名字节点第一关系的门面
StartupOption startOpt = NameNode.getStartupOption(conf);
//通过FSDirectory.loadFSImage,加载命名空间镜像,应用编辑日志
this.dir.loadFSImage(getNamespaceDirs(conf),
getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().fsImageLoadTime.set(
(int) timeTakenToLoadFSImage);
//创建安全模式管理对象SafeModeInfo
this.safeMode = new SafeModeInfo(conf);
//调用setBlockTotal()对SafeModeInfo.blockTotal进行更新
setBlockTotal();
pendingReplications = new PendingReplicationBlocks(
conf.getInt("dfs.replication.pending.timeout.sec",
-1) * 1000L);
//创建和启动心跳检查/租约检查/数据块复制删除线程
    this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
this.replthread = new Daemon(new ReplicationMonitor());
hbthread.start();
lmthread.start();
replthread.start();
//读入数据节点的include和exclude文件,创建并启动数据节点撤销检查线程
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();

this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);

/* If the dns to swith mapping supports cache, resolve network
* locations of those hosts in the include list,
* and store the mapping in the cache; so future calls to resolve
* will be fast.
*/
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
}
6.这一系列工作做完后,回到main函数,这时已经成功创建和初始化了namenode对象并返回到main中,

成功启动namenode之后,执行namenode.join(),意思是等待IPC服务器结束。

这就是源码中的namenode启动流程,直接run一下namenode.java,可以成功启动namenode,如图:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop hdfs 源码 namenode