您的位置:首页 > 其它

beeline连接hiveserver2源码分析

2017-08-01 15:10 197 查看
hiveserver2支持jdbc方式的连接,而beeline就是一个基于jdbc实现的hive客户端,下面分析下beeline连接hiveserver2的源码,hive版本为1.1.0。
首先进入BeeLine类,定位到main函数作为入口分析点,接着调用了begin方法。


public int begin(String[] args, InputStream inputStream) throws IOException {
try {
// load the options first, so we can override on the command line
getOpts().load();
} catch (Exception e) {
// nothing
}

try {
int code = initArgs(args);
if (code != 0) {
return code;
}

if (getOpts().getScriptFile() != null) {
return executeFile(getOpts().getScriptFile());
}
try {
info(getApplicationTitle());
} catch (Exception e) {
// ignore
}
ConsoleReader reader = getConsoleReader(inputStream);
return execute(reader, false);
} finally {
close();
}
}


这个方法前面都是在做输入处理,在此忽略不做太多分析,重点看execute方法。一层层往下追,直到来到Commands的execute方法。

try {
Statement stmnt = null;
boolean hasResults;
Thread logThread = null;

try {
long start = System.currentTimeMillis();

if (call) {
stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
hasResults = ((CallableStatement) stmnt).execute();
} else {
stmnt = beeLine.createStatement();
if (beeLine.getOpts().isSilent()) {
hasResults = stmnt.execute(sql);
} else {
logThread = new Thread(createLogRunnable(stmnt));
logThread.setDaemon(true);
logThread.start();
hasResults = stmnt.execute(sql);
logThread.interrupt();
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
}
}

beeLine.showWarnings();

if (hasResults) {
do {
ResultSet rs = stmnt.getResultSet();
try {
int count = beeLine.print(rs);
long end = System.currentTimeMillis();

beeLine.info(beeLine.loc("rows-selected", count) + " "
+ beeLine.locElapsedTime(end - start));
} finally {
if (logThread != null) {
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
showRemainingLogsIfAny(stmnt);
logThread = null;
}
rs.close();
}
} while (BeeLine.getMoreResults(stmnt));
}


追踪stmnt = beeLine.createStatement()这一行代码进到HiveConnection类的createStatement方法,该方法会返回一个HiveStatement对象。

public Statement createStatement() throws SQLException {
if (isClosed) {
throw new SQLException("Can't create Statement, connection is closed");
}
return new HiveStatement(this, client, sessHandle);
}


HiveStatement的构造方法有三个参数,分别为当前HiveConnection对象,client和sessHandle,后两者是在HiveConnection的构造方法中实例化的。

} else {
// extract user/password from JDBC connection properties if its not supplied in the
// connection URL
if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
}
}
if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) {
sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE));
}
// open the client transport
openTransport();
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
}

// add supported protocols
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8);

// open client session
openSession();
}


在给client赋值的时候又用到了transport,这个对象是openTransport方法中被赋值,接着追踪openTransport方法。

private void openTransport() throws SQLException {
while (true) {
try {
assumeSubject =
JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
.get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
if (!transport.isOpen()) {
LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString);
transport.open();
}
break;
}


可见在openTransport方法中,代码为TTransport实例化了一个transport对象,建立了一个到hiveServer2的连接。再回到HiveConnection的构造方法中,代码在给client对象赋值之后执行了openSession方法,为sessHandle对象赋值。至此,HiveConnection的构造完成,重新来看HiveConnection类里的createStatement方法,该方法返回了一个初始化的HiveStatement对象。回过头来看Commands类的execute方法,该方法在得到HiveStatement类的对象之后接着执行了该类的execute方法。

public boolean execute(String sql) throws SQLException {
checkConnection("execute");

closeClientOperation();
initFlags();

TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
* Run asynchronously whenever possible
* Currently only a SQLOperation can be run asynchronously,
* in a background operation thread
* Compilation is synchronous and execution is asynchronous
*/
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);

transportLock.lock();
try {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
isExecuteStatementFailed = false;
} catch (SQLException eS) {
isExecuteStatementFailed = true;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
throw new SQLException(ex.toString(), "08S01", ex);
} finally {
transportLock.unlock();
}

TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
boolean operationComplete = false;
TGetOperationStatusResp statusResp;

// Poll on the operation status, till the operation is complete
while (!operationComplete) {
try {
/**
* For an async SQLOperation, GetOperationStatus will use the long polling approach
* It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
transportLock.lock();
try {
statusResp = client.GetOperationStatus(statusReq);
} finally {
transportLock.unlock();
}
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
case CLOSED_STATE:
case FINISHED_STATE:
operationComplete = true;
break;
case CANCELED_STATE:
// 01000 -> warning
throw new SQLException("Query was cancelled", "01000");
case ERROR_STATE:
// Get the error details from the underlying exception
throw new SQLException(statusResp.getErrorMessage(),
statusResp.getSqlState(), statusResp.getErrorCode());
case UKNOWN_STATE:
throw new SQLException("Unknown query", "HY000");
case INITIALIZED_STATE:
case PENDING_STATE:
case RUNNING_STATE:
break;
}
}
} catch (SQLException e) {
isLogBeingGenerated = false;
throw e;
} catch (Exception e) {
isLogBeingGenerated = false;
throw new SQLException(e.toString(), "08S01", e);
}
}
isLogBeingGenerated = false;

// The query should be completed by now
if (!stmtHandle.isHasResultSet()) {
return false;
}
resultSet =  new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
.setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
.setScrollable(isScrollableResultset).setTransportLock(transportLock)
.build();
return true;
}


关键代码为TExecuteStatementResp execResp = client.ExecuteStatement(execReq),由之前创建的client对象执行ExecuteStatement方法去hiveServer2执行sql命令并得到返回结果。后续的操作就是sql命令在hiveServer2上的执行了,下篇文章继续分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: