您的位置:首页 > Web前端 > Node.js

【框架解析】Hadoop系统分析(四)--namenode regular

2012-08-20 17:26 357 查看

hadoop namenode -regular

正常启动NameNode服务,regular是默认参数用于启动服务,如果不带参数直接执行hadoop namenode,也是进入regular运行流程。

执行regular,主要完成两个任务

加载fsimage,建立blockmap
启动如下的一些服务:

服务作用
serviceRpcServerorg.apache.hadoop.ipc.RPC.Servernamenode-datanode,namenode-snn之间通讯的端口,取dfs.namenode.servicerpc-address配置的地址,如果没有配置,则使用fs.default.name地址
serverorg.apache.hadoop.ipc.RPC.Servernamenode-client之间通讯的端口,使用fs.default.name配置的地址
HttpServerorg.apache.hadoop.http.HttpServer提供Http服务
emptierorg.apache.hadoop.fs.Trash.Emptier定时清除/user/用户/.Trash目录内容,定时时间根据fs.trash.interval参数指定,默认为60分钟
hbthreadorg.apache.hadoop.hdfs.server.namenode.FSNamesystem.HeartbeatMonitor用于监听datanode是否停止服务,并且发送控制命令给datanode
lmthreadorg.apache.hadoop.hdfs.server.namenode.LeaseManager.Monitor客户端对文件进行写操作(create/append)时需要申请独占租约,Monitor用于监控租约是否过期,过期就释放租约
replthreadorg.apache.hadoop.hdfs.server.namenode.FSNamesystem.ReplicationMonitor监控数据的副本,如果超出则要删除,如果不够则需要进行复制
dnthreadorg.apache.hadoop.hdfs.server.namenode.DecommissionManager.Monitordatanode下线时,对下线服务器上的数据块进行监控管理
启动时,NameNode.createNameNode将会执行如下命令
DefaultMetricsSystem.initialize("NameNode"); //初始化默认的指标系统统计NameNode信息
NameNode namenode = new NameNode(conf);  //创建一个NameNode实例


之后所有服务主要会分为三个部分:

执行初始化方法NameNode.initialize,启动rpc server与http server用于对外提供服务以及同datanode交互:

InetSocketAddress socAddr = NameNode.getAddress(conf); //读取fs.default.name的值,获取hdfs集群的地址
//设置用户权限信息,如果hadoop.security.authentication=kerberos,就开启权限限制,如果为null或是simple,就不开权限。
//setConfiguration执行时,会执行UserGroupInformation.initialize(Configuration conf)对UserGroupInformation进行初始化。
UserGroupInformation.setConfiguration(conf);
//如果dfs.namenode.keytab.file存在,并且kerberos已经开启,则调用UserGroupInformation.loginUserFromKeytab进行登陆
//登陆使用dfs.namenode.kerberos.principal作为用户名,否则使用当前linux的user作为用户。
SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
//获取namenode同时处理请求的数量配置,默认为10
int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);

// set service-level authorization security policy
//如果授权配置(hadoop.security.authorization)开启了,则刷新授权策略
if (serviceAuthEnabled = conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
//默认会重新加载hadoop-policy.xml中的acl配置,加载完成后acl配置保存在ServiceAuthorizationManager.protocolToAcl中
ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
}
//创建服务指标,用于观察namenode服务状态
myMetrics = NameNodeInstrumentation.create(conf);

//启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责
this.namesystem = new FSNamesystem(this, conf);

//如果安全机制开启
if (UserGroupInformation.isSecurityEnabled()) {
//启动守护线程每5秒执行一次ExpiredTokenRemover
namesystem.activateSecretManager();
}

//创建rpc服务器,如果dfs.namenode.servicerpc-address配置项存在,则用来作为服务器地址
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());

// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
FileSystem.setDefaultUri(conf, getUri(serverAddress));
LOG.info("Namenode up at: " + this.serverAddress);

//http服务器
startHttpServer(conf);
//启动rpc服务器
this.server.start();  //start RPC server
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
//启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次
startTrashEmptier(conf);

在调用new FSNamesystem时,会执行FSNamesystem.initialize方法,其中会加载FsImage与editlog内容到内存,以及启动四个线程来完成NameNode的各项职责

this.systemStart = now();
//读取配置文件
setConfigurationParameters(conf);
//读取dfs.namenode.delegation.token的相关配置
dtSecretManager = createDelegationTokenSecretManager(conf);

this.nameNodeAddress = nn.getNameNodeAddress();
this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
this.dir = new FSDirectory(this, conf);
StartupOption startOpt = NameNode.getStartupOption(conf);
//加载namenode持久化在硬盘的信息
this.dir.loadFSImage(getNamespaceDirs(conf),
getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);
//加载安全模式信息,进入安全模式
this.safeMode = new SafeModeInfo(conf);
//将block的总数设置给safemode
setBlockTotal();
pendingReplications = new PendingReplicationBlocks(
conf.getInt("dfs.replication.pending.timeout.sec",
-1) * 1000L);
if (isAccessTokenEnabled) {
accessTokenHandler = new BlockTokenSecretManager(true,
accessKeyUpdateInterval, accessTokenLifetime);
}
//启动心跳监控的线程
this.hbthread = new Daemon(new HeartbeatMonitor());
//启动文件租约管理监控的线程
this.lmthread = new Daemon(leaseManager.new Monitor());
//启动副本监控的线程
this.replmon = new ReplicationMonitor();
this.replthread = new Daemon(replmon);
hbthread.start();
lmthread.start();
replthread.start();
//读取能够连接以及不能连接的主机信息
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
//启动退役节点监控的线程
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt("dfs.namenode.decommission.interval", 30),
conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();

this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);

/* If the dns to swith mapping supports cache, resolve network
* locations of those hosts in the include list,
* and store the mapping in the cache; so future calls to resolve
* will be fast.
*/
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}

InetSocketAddress socAddr = NameNode.getAddress(conf);
this.nameNodeHostName = socAddr.getHostName();
//将这个类注册到监控系统
registerWith(DefaultMetricsSystem.INSTANCE);

调用FSDirectory.loadFSImage加载FsImage信息到内存

// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
fsImage.setStorageDirectories(dataDirs, editsDirs);
fsImage.format();
startOpt = StartupOption.REGULAR;
}
try {
//1.校验并读取元数据目录和日志目录
//2.针对upgrade\import\rollback操作执行对应命令
//3.合并后加载进内存
if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
//保存元数据后创建一个新的日志文件
fsImage.saveNamespace(true);
}
//打开日志文件
FSEditLog editLog = fsImage.getEditLog();
assert editLog != null : "editLog must be initialized";
if (!editLog.isOpen())
editLog.open();
fsImage.setCheckpointDirectories(null, null);
} catch(IOException e) {
fsImage.close();
throw e;
}
synchronized (this) {
this.ready = true;
this.nameCache.initialized();
this.notifyAll();
}

最后让namenode保持在启动状态

namenode.join();


大致的运行时序图如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: