您的位置:首页 > 编程语言

hbase 源代码解析(2)HAdmin 的表创建过程

2017-07-10 22:41 1096 查看
这个源代码将写一个系列。先走流程一样。过一遍,然后重点分析里面的功能。

HConection 里的一个重要方法.getAdmin。获得HBaseAdmin。

*HBaseAdmin提供了建表、创建列簇、检查表是否存在、修改表结构和列簇结构、删除表等功能。*


这章主要将HBaseAdmin的createTable客户端的代码。

hbaseAdmin对象拿到connetion。保存conf,rpc等一些东西。

HBaseAdmin(ClusterConnection connection) {
this.conf = connection.getConfiguration();
this.connection = connection;
this.rpcCallerFactory = connection.getRpcRetryingCallerFactory();
this.rpcControllerFactory = connection.getRpcControllerFactory();
this.ng = this.connection.getNonceGenerator();
}


以crateTable。不阻塞,可以等。在future里的等就像。所以是同步方法。

/**
* Creates a new table but does not block and wait for it to come online.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.


Future future = createTableAsyncV2(desc, splitKeys);

应该尽量去配置splitKeys,可以为null 但是不能为”“.getBytes().

然后调用executeCallable

CreateTableResponse response = executeCallable(
new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
public CreateTableResponse call(int callTimeout) throws ServiceException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setCallTimeout(callTimeout);
controller.setPriority(desc.getTableName());
CreateTableRequest request = RequestConverter.buildCreateTableRequest(
desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
return master.createTable(controller, request);
}
});


这个最主要的是master.createTable(controller, request);这个后面点讲

new MasterCallable 实现RetryingCallable

在这里主要是

interface MasterKeepAliveConnection  extends MasterProtos.MasterService.BlockingInterface

this.master = this.connection.getKeepAliveMasterService();
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();


这里master 就是一个MasterProtos。用于通信

这个会调用一个callWithRetries的方法。重复尝试,

private static <V> V executeCallable(MasterCallable<V> callable,
RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
try {
return caller.callWithRetries(callable, operationTimeout);
} finally {
callable.close();
}
}


上面掉了一个东西。我们知道,client首先访问的应该是zk,并且重zk里拿到HMaster的地址。下面将介绍怎么连接zk的。

在callWithRetries里有这么一段代码。

// bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));


这里的prepare就是去获取zk

@Override
public void prepare(boolean reload) throws IOException {
this.master = this.connection.getKeepAliveMasterService();
}


connetion里的方法。首先判断Master是否在运行。第一次的时候因为mss.getStub()是空的返回false

所以 newMasterServiceStubMaker

@Override
public MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException {
synchronized (masterAndZKLock) {
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
try {
this.masterServiceState.stub = stubMaker.makeStub();


这个是makestub的主要方法。这里都很重要,

private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
ZooKeeperKeepAliveConnection zkw;
try {
zkw = getKeepAliveZooKeeperWatcher();
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
}
try {
checkIfBaseNodeAvailable(zkw);
ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
throw new MasterNotRunningException(msg);
}
if (isDeadServer(sn)) {
throw new MasterNotRunningException(sn + " is dead.");
}
// Use the security info interface name as our stub key
String key = getStubKey(getServiceName(),
sn.getHostname(), sn.getPort(), hostnamesCanChange);
connectionLock.putIfAbsent(key, key);
Object stub = null;
synchronized (connectionLock.get(key)) {
stub = stubs.get(key);
if (stub == null) {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = makeStub(channel);
isMasterRunning();
stubs.put(key, stub);
}
}
return stub;
} finally {
zkw.close();
}
}


1)首先是getKeepAliveZooKeeperWatcher 这里就获取到zookeper的连接的。this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);

2)然后就是从zk里的默认路径/hbase/master拿到ServerName名字 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);

3)最重要的一步建立stub

BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);

stub = makeStub(channel);

这个stub保存在mater的rpc连接。然后通过protos就能访问MasterService。

回到master

这个下一张节。。

未完待续….
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源代码 hbase