Hbase0.96源码之HMaster(一)
2017-08-19 16:33
260 查看
从main()函数開始
public static void main(String [] args) { VersionInfo.logVersion(); new HMasterCommandLine(HMaster.class).doMain(args); } public void doMain(String args[]) { try { int ret = ToolRunner.run(HBaseConfiguration.create(), this, args); if (ret != 0) { System.exit(ret); } } catch (Exception e) { LOG.error("Failed to run", e); System.exit(-1); } }
这里是使用的hadoop的ToolRunner。
public static int More ...run(Configuration conf, Tool tool, String[] args) throws Exception{ if(conf == null) { conf = new Configuration(); } GenericOptionsParser parser = new GenericOptionsParser(conf, args); //set the configuration back, so that Tool can configure itself tool.setConf(conf); //get the args w/o generic hadoop args String[] toolArgs = parser.getRemainingArgs(); return tool.run(toolArgs); }
事实上就是走了一遍GenericOptionsParser解析參数。又回到tool.run(toolArgs);而这里的tool事实上有这种继承关系HMasterCommandLine-->ServerCommandLine-->Tool,而这里的tool事实上有这种继承关系HMasterCommandLine的run()方法
public int run(String args[]) throws Exception { Options opt = new Options(); opt.addOption("localRegionServers", true, "RegionServers to start in master process when running standalone"); opt.addOption("masters", true, "Masters to start in this process"); opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables"); opt.addOption("backup", false, "Do not try to become HMaster until the primary fails"); CommandLine cmd; try { cmd = new GnuParser().parse(opt, args); } catch (ParseException e) { LOG.error("Could not parse: ", e); usage(null); return 1; } if (cmd.hasOption("minRegionServers")) { String val = cmd.getOptionValue("minRegionServers"); getConf().setInt("hbase.regions.server.count.min", Integer.valueOf(val)); LOG.debug("minRegionServers set to " + val); } // minRegionServers used to be minServers. Support it too. if (cmd.hasOption("minServers")) { String val = cmd.getOptionValue("minServers"); getConf().setInt("hbase.regions.server.count.min", Integer.valueOf(val)); LOG.debug("minServers set to " + val); } // check if we are the backup master - override the conf if so if (cmd.hasOption("backup")) { getConf().setBoolean(HConstants.MASTER_TYPE_BACKUP, true); } // How many regionservers to startup in this process (we run regionservers in same process as // master when we are in local/standalone mode. Useful testing) if (cmd.hasOption("localRegionServers")) { String val = cmd.getOptionValue("localRegionServers"); getConf().setInt("hbase.regionservers", Integer.valueOf(val)); LOG.debug("localRegionServers set to " + val); } // How many masters to startup inside this process; useful testing if (cmd.hasOption("masters")) { String val = cmd.getOptionValue("masters"); getConf().setInt("hbase.masters", Integer.valueOf(val)); LOG.debug("masters set to " + val); } List<String> remainingArgs = cmd.getArgList(); if (remainingArgs.size() != 1) { usage(null); return 1; } String command = remainingArgs.get(0); if ("start".equals(command)) { return startMaster(); } else if ("stop".equals(command)) { return stopMaster(); } else if ("clear".equals(command)) { return (ZNodeClearer.clear(getConf()) ? 0 : 1); } else { usage("Invalid command: " + command); return 1; } }
启动的时候command="start",走startMaster()
private int startMaster() { Configuration conf = getConf(); try { // If 'local', defer to LocalHBaseCluster instance. Starts master // and regionserver both in the one JVM. if (LocalHBaseCluster.isLocal(conf)) { final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf); File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR)); int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0); if (zkClientPort == 0) { throw new IOException("No config value for " + HConstants.ZOOKEEPER_CLIENT_PORT); } zooKeeperCluster.setDefaultClientPort(zkClientPort); // login the zookeeper server principal (if using security) ZKUtil.loginServer(conf, "hbase.zookeeper.server.keytab.file", "hbase.zookeeper.server.kerberos.principal", null); int clientPort = zooKeeperCluster.startup(zkDataPath); if (clientPort != zkClientPort) { String errorMsg = "Could not start ZK at requested port of " + zkClientPort + ". ZK was started at port: " + clientPort + ". Aborting as clients (e.g. shell) will not be able to find " + "this ZK quorum."; System.err.println(errorMsg); throw new IOException(errorMsg); } conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); // Need to have the zk cluster shutdown when master is shutdown. // Run a subclass that does the zk cluster shutdown on its way out. LocalHBaseCluster cluster = new LocalHBaseCluster(conf, conf.getInt("hbase.masters", 1), conf.getInt("hbase.regionservers", 1), LocalHMaster.class, HRegionServer.class); ((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster); cluster.startup(); waitOnMasterThreads(cluster); } else { logProcessInfo(getConf()); HMaster master = HMaster.constructMaster(masterClass, conf); if (master.isStopped()) { LOG.info("Won't bring the Master up as a shutdown is requested"); return 1; } master.start(); master.join(); if(master.isAborted()) throw new RuntimeException("HMaster Aborted");} } catch (Throwable t) { LOG.error("Master exiting", t); return 1; } return 0; }
假设LocalHBaseCluster.isLocal(),启动一个本地集群LocalHBaseCluster,这里会在同一个JVM启动HMaster,HRegionServer
这里默认:hbase.cluster.distributed=false
/** Cluster is in distributed mode or not */ public static final String CLUSTER_DISTRIBUTED = "hbase.cluster.distributed"; /** Config for pluggable load balancers */ public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class"; /** Cluster is standalone or pseudo-distributed */ public static final boolean CLUSTER_IS_LOCAL = false; public static boolean isLocal(final Configuration c) { boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); return(mode == HConstants.CLUSTER_IS_LOCAL); }
假设hbase.cluster.distributed=true,创建一个HMaster实例
logProcessInfo(getConf()); HMaster master = HMaster.constructMaster(masterClass, conf); if (master.isStopped()) { LOG.info("Won't bring the Master up as a shutdown is requested"); return 1; } master.start(); master.join(); if(master.isAborted()) throw new RuntimeException("HMaster Aborted");
构造HMaster
Constructor<?[p] 正是的构造函数:extends HMaster> c = masterClass.getConstructor(Configuration.class); return c.newInstance(conf);
1,获取master的地址,ip:port
public static final String MASTER_PORT = "hbase.master.port";为端口,默认60000 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);2,处理RPC的handler的数量。由hbase.regionserver.handler.count
3,创建HMaster的RPCServer实例.
4,启动RPCServer的线程,
5, 创建zookeeper,创建hbase相关节点[/p]
public HMaster(final Configuration conf) throws IOException, KeeperException, InterruptedException { this.conf = new Configuration(conf); // Disable the block cache on the master this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); // Server to handle client requests. String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost( conf.get("hbase.master.dns.interface", "default"), conf.get("hbase.master.dns.nameserver", "default"))); int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); // Test that the hostname is reachable InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa); } // Verify that the bind address is reachable if set String bindAddress = conf.get("hbase.master.ipc.address"); if (bindAddress != null) { initialIsa = new InetSocketAddress(bindAddress, port); if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa); } } String name = "master/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); int numHandlers = conf.getInt("hbase.master.handler.count", conf.getInt("hbase.regionserver.handler.count", 25)); this.rpcServer = new RpcServer(this, name, getServices(), initialIsa, // BindAddress is IP we got for this server. numHandlers, 0, // we dont use high priority handlers in master conf, 0); // this is a DNC w/o high priority handlers // Set our address. this.isa = this.rpcServer.getListenerAddress(); // We don't want to pass isa's hostname here since it could be 0.0.0.0 this.serverName = new ServerName(hostname, this.isa.getPort(), System.currentTimeMillis()); this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024)); // login the zookeeper client principal (if using security) ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file", "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName()); // initialize server principal (if using secure Hadoop) User.login(conf, "hbase.master.keytab.file", "hbase.master.kerberos.principal", this.isa.getHostName()); LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) + ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false)); // set the thread name now we have an address setName(MASTER + ":" + this.serverName.toShortString()); Replication.decorateMasterConfiguration(this.conf); // Hack! Maps DFSClient => Master for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapred.task.id") == null) { this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString()); } this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);this.rpcServer.startThreads(); this.pauseMonitor = new JvmPauseMonitor(conf); this.pauseMonitor.start(); // metrics interval: using the same property as region server. this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); //should we check the compression codec type at master side, default true, HBASE-6370 this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this)); // Health checker thread. int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); if (isHealthCheckerConfigured()) { healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); } // Do we publish the status? boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class<? extends ClusterStatusPublisher.Publisher> publisherClass = conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS, ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS, ClusterStatusPublisher.Publisher.class); if (shouldPublish) { if (publisherClass == null) { LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS + " is not set - not publishing status"); } else { clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread()); } } distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); }
创建zookeeper。连接到zookeeper,创建hbase相关节点。
这里主要两个函数setNodeNames()和createBaseZNodes()
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { this.conf = conf; // Capture a stack trace now. Will print it out later if problem so we can // distingush amongst the myriad ZKWs. try { throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING"); } catch (Exception e) { this.constructorCaller = e; } this.quorum = ZKConfig.getZKQuorumServersString(conf); // Identifier will get the sessionid appended later below down when we // handle the syncconnect event. this.identifier = identifier; this.abortable = abortable; setNodeNames(conf); this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); if (canCreateBaseZNode) { createBaseZNodes(); } }
HMaster的主循环的run()方法,这里主要关心
1,becomeActiveMaster(startupStatus);
2,finishInitialization(startupStatus, false);
3,loop()
@Override public void run() { MonitoredTask startupStatus = TaskMonitor.get().createStatus("Master startup"); startupStatus.setDescription("Master startup"); masterStartTime = System.currentTimeMillis(); try { this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners(); this.masterAddressManager = new MasterAddressTracker(getZooKeeperWatcher(), this); this.masterAddressManager.start(); // Put up info server. int port = this.conf.getInt("hbase.master.info.port", 60010); if (port >= 0) { String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); this.infoServer = new InfoServer(MASTER, a, port, false, this.conf); this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class); this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class); this.infoServer.setAttribute(MASTER, this); this.infoServer.start(); } /* * Block on becoming the active master. * * We race with other masters to write our address into ZooKeeper. If we * succeed, we are the primary/active master and finish initialization. * * If we do not succeed, there is another active master and we should * now wait until it dies to try and become the next active master. If we * do not succeed on our first attempt, this is no longer a cluster startup. */ becomeActiveMaster(startupStatus); // We are either the active master or we were asked to shutdown if (!this.stopped) { finishInitialization(startupStatus, false); loop(); } } catch (Throwable t) { // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility if (t instanceof NoClassDefFoundError && t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) { // improved error message for this special case abort("HBase is having a problem with its Hadoop jars. You may need to " + "recompile HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() + " or change your hadoop jars to start properly", t); } else { abort("Unhandled exception. Starting shutdown.", t); } } finally { startupStatus.cleanup(); stopChores(); // Wait for all the remaining region servers to report in IFF we were // running a cluster shutdown AND we were NOT aborting. if (!this.abort && this.serverManager != null && this.serverManager.isClusterShutdown()) { this.serverManager.letRegionServersShutdown(); } stopServiceThreads(); // Stop services started for both backup and active masters if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); if (this.fileSystemManager != null) this.fileSystemManager.stop(); if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down."); this.zooKeeper.close(); } LOG.info("HMaster main thread exiting"); }[/code]
相关文章推荐
- Hbase0.96源码之HMaster(一)
- Hbase0.96源码之HMaster(二)Hmaster主要循环becomeActiveMaster
- HBase1.0.0版源码分析之HMaster启动代码分析(1)
- hbase源码系列(六)HMaster启动过程
- HBase-HMaster源码分析
- HBase源码解析(二) HMaster主要类成员解析
- HBase1.0.0版源码分析之HMaster启动代码分析(2)
- Hbase源码之HMaster
- HBase源码解析(一) HMaster启动流程
- hbase0.96 put流程 源码分析
- 【甘道夫】HBase(0.96以上版本号)过滤器Filter具体解释及实例代码
- 写在HBase源码分析前面的话
- HBase(0.96以上版本)过滤器Filter详解及实例代码(转)
- HBase源码系列(一)客户端入口HTable
- Hbase启动的时候出现:[RpcServer.handler=28,port=60000] ipc.RpcServer: RpcServer.handler=28,port=60000: exiting,master.HMasterCommandLine: Master exiting
- phoenix实战(hadoop2、hbase0.96)
- HBase源码-1
- 【备忘】Hadoop,Hbase,Hive源码解析与开发实战
- HBase源码系列(六)HBase存储结构与StoreFile存储格式
- HBase源码分析之org.apache.hadoop.hbase.catalog包