beeline连接hiveserver2源码分析
2017-08-01 15:10
197 查看
hiveserver2支持jdbc方式的连接,而beeline就是一个基于jdbc实现的hive客户端,下面分析下beeline连接hiveserver2的源码,hive版本为1.1.0。 首先进入BeeLine类,定位到main函数作为入口分析点,接着调用了begin方法。
public int begin(String[] args, InputStream inputStream) throws IOException { try { // load the options first, so we can override on the command line getOpts().load(); } catch (Exception e) { // nothing } try { int code = initArgs(args); if (code != 0) { return code; } if (getOpts().getScriptFile() != null) { return executeFile(getOpts().getScriptFile()); } try { info(getApplicationTitle()); } catch (Exception e) { // ignore } ConsoleReader reader = getConsoleReader(inputStream); return execute(reader, false); } finally { close(); } }
这个方法前面都是在做输入处理,在此忽略不做太多分析,重点看execute方法。一层层往下追,直到来到Commands的execute方法。
try { Statement stmnt = null; boolean hasResults; Thread logThread = null; try { long start = System.currentTimeMillis(); if (call) { stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql); hasResults = ((CallableStatement) stmnt).execute(); } else { stmnt = beeLine.createStatement(); if (beeLine.getOpts().isSilent()) { hasResults = stmnt.execute(sql); } else { logThread = new Thread(createLogRunnable(stmnt)); logThread.setDaemon(true); logThread.start(); hasResults = stmnt.execute(sql); logThread.interrupt(); logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); } } beeLine.showWarnings(); if (hasResults) { do { ResultSet rs = stmnt.getResultSet(); try { int count = beeLine.print(rs); long end = System.currentTimeMillis(); beeLine.info(beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start)); } finally { if (logThread != null) { logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); showRemainingLogsIfAny(stmnt); logThread = null; } rs.close(); } } while (BeeLine.getMoreResults(stmnt)); }
追踪stmnt = beeLine.createStatement()这一行代码进到HiveConnection类的createStatement方法,该方法会返回一个HiveStatement对象。
public Statement createStatement() throws SQLException { if (isClosed) { throw new SQLException("Can't create Statement, connection is closed"); } return new HiveStatement(this, client, sessHandle); }
HiveStatement的构造方法有三个参数,分别为当前HiveConnection对象,client和sessHandle,后两者是在HiveConnection的构造方法中实例化的。
} else { // extract user/password from JDBC connection properties if its not supplied in the // connection URL if (info.containsKey(JdbcConnectionParams.AUTH_USER)) { sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER)); if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) { sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD)); } } if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) { sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE)); } // open the client transport openTransport(); // set up the client client = new TCLIService.Client(new TBinaryProtocol(transport)); } // add supported protocols supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V3); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V4); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V5); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8); // open client session openSession(); }
在给client赋值的时候又用到了transport,这个对象是openTransport方法中被赋值,接着追踪openTransport方法。
private void openTransport() throws SQLException { while (true) { try { assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); if (!transport.isOpen()) { LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString); transport.open(); } break; }
可见在openTransport方法中,代码为TTransport实例化了一个transport对象,建立了一个到hiveServer2的连接。再回到HiveConnection的构造方法中,代码在给client对象赋值之后执行了openSession方法,为sessHandle对象赋值。至此,HiveConnection的构造完成,重新来看HiveConnection类里的createStatement方法,该方法返回了一个初始化的HiveStatement对象。回过头来看Commands类的execute方法,该方法在得到HiveStatement类的对象之后接着执行了该类的execute方法。
public boolean execute(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); initFlags(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); /** * Run asynchronously whenever possible * Currently only a SQLOperation can be run asynchronously, * in a background operation thread * Compilation is synchronous and execution is asynchronous */ execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); transportLock.lock(); try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); isExecuteStatementFailed = false; } catch (SQLException eS) { isExecuteStatementFailed = true; throw eS; } catch (Exception ex) { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } finally { transportLock.unlock(); } TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); boolean operationComplete = false; TGetOperationStatusResp statusResp; // Poll on the operation status, till the operation is complete while (!operationComplete) { try { /** * For an async SQLOperation, GetOperationStatus will use the long polling approach * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ transportLock.lock(); try { statusResp = client.GetOperationStatus(statusReq); } finally { transportLock.unlock(); } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { case CLOSED_STATE: case FINISHED_STATE: operationComplete = true; break; case CANCELED_STATE: // 01000 -> warning throw new SQLException("Query was cancelled", "01000"); case ERROR_STATE: // Get the error details from the underlying exception throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode()); case UKNOWN_STATE: throw new SQLException("Unknown query", "HY000"); case INITIALIZED_STATE: case PENDING_STATE: case RUNNING_STATE: break; } } } catch (SQLException e) { isLogBeingGenerated = false; throw e; } catch (Exception e) { isLogBeingGenerated = false; throw new SQLException(e.toString(), "08S01", e); } } isLogBeingGenerated = false; // The query should be completed by now if (!stmtHandle.isHasResultSet()) { return false; } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset).setTransportLock(transportLock) .build(); return true; }
关键代码为TExecuteStatementResp execResp = client.ExecuteStatement(execReq),由之前创建的client对象执行ExecuteStatement方法去hiveServer2执行sql命令并得到返回结果。后续的操作就是sql命令在hiveServer2上的执行了,下篇文章继续分析。
相关文章推荐
- 启动./hiveserver2查看10000端口 使用beeline连接服务
- 由“Beeline连接HiveServer2后如何使用指定的队列(Yarn)运行Hive SQL语句”引发的一系列思考
- 解决beeline 不能连接 hiveserver2的问题
- hive原理与源码分析-服务化:LLAP、HiveServer2、MetaStore(七)
- beeline连接hive server遭遇MapRedTask (state=08S01,code=1)错误
- HiveServer2 源码分析
- ServerBootstrap绑定和连接过程源码分析
- dataserver 源码分析(三) 读写数据
- Mina源码分析博客连接汇总
- Hive执行过程源码分析
- nginx源码分析--监听套接字的创建 套接字的监听 HTTP请求创建连接
- Android System Server进程源码分析 下
- 【Zookeeper】源码分析之服务器(二)之ZooKeeperServer
- Spring Corbar事务源码 以及连接泄露分析
- Hive on Spark源码分析(五)—— RemoteDriver
- Lighttpd1.4.20源码分析 笔记 fdevent系统-连接socket及超时处理
- gdb和gdbserver源码架构分析
- Hbase 源码分析之 Regionserver下的 Get 全流程
- UiAutomator系列——Appium Server源码分析之作为Bootstrap客户端(011)