数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
2018-02-14 00:00
507 查看
本文主要基于 MyCAT 1.6.5 正式版1. 概述
2. 接收请求,解析 SQL
3. 获得路由结果
4. 获得 MySQL 连接,执行 SQL
5. 响应执行 SQL 结果
6. 其他 :更新 / 删除
友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。本文讲解 【单库单表】查询 所涉及到的代码。?内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 ? 标记差异的逻辑。交互如下图:
单库单表查询简图整个过程,MyCAT Server 流程如下:接收 MySQL Client 请求,解析 SQL。
获得路由结果,进行路由。
获得 MySQL 连接,执行 SQL。
响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。
【单库单表】查询(01主流程)
【单库单表】插入(02获取路由)
?【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。
【单库单表】查询(03执行 SQL)
PhysicalDatasource :物理数据库数据源。
【单库单表】查询(04执行响应)核心代码如下:
2. 接收请求,解析 SQL
3. 获得路由结果
4. 获得 MySQL 连接,执行 SQL
5. 响应执行 SQL 结果
6. 其他 :更新 / 删除
友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。友情提示:欢迎关注公众号【芋道源码】。?关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
1. 概述
内容形态以 顺序图 + 核心代码 为主。如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。本文讲解 【单库单表】查询 所涉及到的代码。?内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 ? 标记差异的逻辑。交互如下图:
单库单表查询简图整个过程,MyCAT Server 流程如下:接收 MySQL Client 请求,解析 SQL。
获得路由结果,进行路由。
获得 MySQL 连接,执行 SQL。
响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。
2. 接收请求,解析 SQL
【单库单表】查询(01主流程)
【1 - 2】
接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。【3】
1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】 2: public class FrontendCommandHandler implements NIOHandler { 3: 4: @Override 5: public void handle(byte[] data) { 6: 7: // .... 省略部分代码 8: switch (data[4]) // 9: { 10: case MySQLPacket.COM_INIT_DB: 11: commands.doInitDB(); 12: source.initDB(data); 13: break; 14: case MySQLPacket.COM_QUERY: // 查询命令 15: // 计数查询命令 16: commands.doQuery(); 17: // 执行查询命令 18: source.query(data); 19: break; 20: case MySQLPacket.COM_PING: 21: commands.doPing(); 22: source.ping(); 23: break; 24: // .... 省略部分case 25: } 26: } 27: 28: }
INSERT/
SELECT/
UPDATE/
DELETE等 SQL 归属于
MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。
【4】
将 二进制数组 解析成 SQL。核心代码如下:1: // ⬇️⬇️⬇️【FrontendConnection.java】 2: public void query(byte[] data) { 3: // 取得语句 4: String sql = null; 5: try { 6: MySQLMessage mm = new MySQLMessage(data); 7: mm.position(5); 8: sql = mm.readString(charset); 9: } catch (UnsupportedEncodingException e) { 10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'"); 11: return; 12: } 13: // 执行语句 14: this.query( sql ); 15: }
【5】
解析 SQL 类型。核心代码如下:1: // ⬇️⬇️⬇️【ServerQueryHandler.java】 2: @Override 3: public void query(String sql) { 4: // 解析 SQL 类型 5: int rs = ServerParse.parse(sql); 6: int sqlType = rs & 0xff; 7: 8: switch (sqlType) { 9: //explain sql 10: case ServerParse.EXPLAIN: 11: ExplainHandler.handle(sql, c, rs >>> 8); 12: break; 13: // .... 省略部分case 14: break; 15: case ServerParse.SELECT: 16: SelectHandler.handle(sql, c, rs >>> 8); 17: break; 18: // .... 省略部分case 19: default: 20: if(readOnly){ 21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); 22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); 23: break; 24: } 25: c.execute(sql, rs & 0xff); 26: } 27: } 28: 29: 30: // ⬇️⬇️⬇️【ServerParse.java】 31: public static int parse(String stmt) { 32: int length = stmt.length(); 33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL 34: int rt = -1; 35: for (int i = 0; i < length; ++i) { 36: switch (stmt.charAt(i)) { 37: // .... 省略部分case case 'I': 38: case 'i': 39: rt = insertCheck(stmt, i); 40: if (rt != OTHER) { 41: return rt; 42: } 43: continue; 44: // .... 省略部分case 45: case 'S': 46: case 's': 47: rt = sCheck(stmt, i); 48: if (rt != OTHER) { 49: return rt; 50: } 51: continue; 52: // .... 省略部分case 53: default: 54: continue; 55: } 56: } 57: return OTHER; 58: }
?【6】【7】
解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:1: // ⬇️⬇️⬇️【SelectHandler.java】 2: public static void handle(String stmt, ServerConnection c, int offs) { 3: int offset = offs; 4: switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型 5: case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT; 6: SelectVersionComment.response(c); 7: break; 8: case ServerParseSelect.DATABASE: // select DATABASE(); 9: SelectDatabase.response(c); 10: break; 11: case ServerParseSelect.USER: // select CURRENT_USER(); 12: SelectUser.response(c); 13: break; 14: case ServerParseSelect.VERSION: // select VERSION(); 15: SelectVersion.response(c); 16: break; 17: case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment; 18: SessionIncrement.response(c); 19: break; 20: case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation; 21: SessionIsolation.response(c); 22: break; 23: case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID(); 24: // ....省略代码 25: break; 26: case ServerParseSelect.IDENTITY: // select @@identity 27: // ....省略代码 28: break; 29: case ServerParseSelect.SELECT_VAR_ALL: // 30: SelectVariables.execute(c,stmt); 31: break; 32: case ServerParseSelect.SESSION_TX_READ_ONLY: // 33: SelectTxReadOnly.response(c); 34: break; 35: default: // 其他,例如 select * from table 36: c.execute(stmt, ServerParse.SELECT); 37: } 38: } 39: // ⬇️⬇️⬇️【ServerParseSelect.java】 40: public static int parse(String stmt, int offset) { 41: int i = offset; 42: for (; i < stmt.length(); ++i) { 43: switch (stmt.charAt(i)) { 44: case ' ': 45: continue; 46: case '/': 47: case '#': 48: i = ParseUtil.comment(stmt, i); 49: continue; 50: case '@': 51: return select2Check(stmt, i); 52: case 'D': 53: case 'd': 54: return databaseCheck(stmt, i); 55: case 'L': 56: case 'l': 57: return lastInsertCheck(stmt, i); 58: case 'U': 59: case 'u': 60: return userCheck(stmt, i); 61: case 'C': 62: case 'c': 63: return currentUserCheck(stmt, i); 64: case 'V': 65: case 'v': 66: return versionCheck(stmt, i); 67: default: 68: return OTHER; 69: } 70: } 71: return OTHER; 72: }
【8】
执行 SQL,详细解析见下文,核心代码如下:1: // ⬇️⬇️⬇️【ServerConnection.java】 2: public class ServerConnection extends FrontendConnection { 3: public void execute(String sql, int type) { 4: // .... 省略代码 5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db); 6: if (schema == null) { 7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, 8: "Unknown MyCAT Database '" + db + "'"); 9: return; 10: } 11: 12: // .... 省略代码 13: 14: // 路由到后端数据库,执行 SQL 15: routeEndExecuteSQL(sql, type, schema); 16: } 17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { 19: // 路由计算 20: RouteResultset rrs = null; 21: try { 22: rrs = MycatServer 23: .getInstance() 24: .getRouterservice() 25: .route(MycatServer.getInstance().getConfig().getSystem(), 26: schema, type, sql, this.charset, this); 27: 28: } catch (Exception e) { 29: StringBuilder s = new StringBuilder(); 30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e); 31: String msg = e.getMessage(); 32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); 33: return; 34: } 35: 36: // 执行 SQL 37: if (rrs != null) { 38: // session执行 39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); 40: } 41: 42: } 43: 44: }
3. 获得路由结果
【单库单表】插入(02获取路由)
【 1 - 5 】
获得路由主流程。核心代码如下:1: // ⬇️⬇️⬇️【SelectHandler.java】 2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema, 3: int sqlType, String stmt, String charset, ServerConnection sc) 4: throws SQLNonTransientException { 5: RouteResultset rrs = null; 6: 7: // SELECT 类型的SQL, 检测缓存是否存在 8: if (sqlType == ServerParse.SELECT) { 9: cacheKey = schema.getName() + stmt; 10: rrs = (RouteResultset) sqlRouteCache.get(cacheKey); 11: if (rrs != null) { 12: checkMigrateRule(schema.getName(),rrs,sqlType); 13: return rrs; 14: } 15: } 16: } 17: 18: // .... 省略代码 19: int hintLength = RouteService.isHintSql(stmt); 20: if(hintLength != -1){ // TODO 待读:hint 21: // .... 省略代码 22: } 23: } else { 24: stmt = stmt.trim(); 25: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt, 26: charset, sc, tableId2DataNodeCache); 27: } 28: 29: // 记录查询命令路由结果缓存 30: if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) { 31: sqlRouteCache.putIfAbsent(cacheKey, rrs); 32: } 33: // .... 省略代码 return rrs; 34: } 35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】 36: @Override 37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL, 38: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException { 39: 40: // .... 省略代码 41: 42: // 处理一些路由之前的逻辑;全局序列号,父子表插入 43: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) { 44: return null; 45: } 46: 47: // .... 省略代码 48: 49: // 检查是否有分片 50: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) { 51: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt); 52: } else { 53: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs); 54: if (returnedSet == null) { 55: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc); 56: } 57: } 58: 59: return rrs; 60: }?【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。
?【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。
4. 获得 MySQL 连接,执行 SQL
【单库单表】查询(03执行 SQL)
【 1 - 8 】
获得 MySQL 连接。PhysicalDBNode :物理数据库节点。PhysicalDatasource :物理数据库数据源。
【 9 - 13 】
发送 SQL 到 MySQL Server,执行 SQL。? 5. 响应执行 SQL 结果
【单库单表】查询(04执行响应)核心代码如下:
1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】 2: @Override 3: protected void handleData(byte[] data) { 4: switch (resultStatus) { 5: case RESULT_STATUS_INIT: 6: switch (data[4]) { 7: case OkPacket.FIELD_COUNT: 8: handleOkPacket(data); 9: break; 10: case ErrorPacket.FIELD_COUNT: 11: handleErrorPacket(data); 12: break; 13: case RequestFilePacket.FIELD_COUNT: 14: handleRequestPacket(data); 15: break; 16: default: // 初始化 header fields 17: resultStatus = RESULT_STATUS_HEADER; 18: header = data; 19: fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data, 20: 4)); 21: } 22: break; 23: case RESULT_STATUS_HEADER: 24: switch (data[4]) { 25: case ErrorPacket.FIELD_COUNT: 26: resultStatus = RESULT_STATUS_INIT; 27: handleErrorPacket(data); 28: break; 29: case EOFPacket.FIELD_COUNT: // 解析 fields 结束 30: resultStatus = RESULT_STATUS_FIELD_EOF; 31: handleFieldEofPacket(data); 32: break; 33: default: // 解析 fields 34: fields.add(data); 35: } 36: break; 37: case RESULT_STATUS_FIELD_EOF: 38: switch (data[4]) { 39: case ErrorPacket.FIELD_COUNT: 40: resultStatus = RESULT_STATUS_INIT; 41: handleErrorPacket(data); 42: break; 43: case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束 44: resultStatus = RESULT_STATUS_INIT; 45: handleRowEofPacket(data); 46: break; 47: default: // 每行记录 48: handleRowPacket(data); 49: } 50: break; 51: default: 52: throw new RuntimeException("unknown status!"); 53: } 54: }
6. 其他 :更新 / 删除
流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。相关文章推荐
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件MyCAT源码分析:【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
- 数据库中间件 MyCAT源码分析:【单库单表】查询
- 数据库中间件 MyCAT源码分析:调试环境搭建
- 数据库中间件 Sharding-JDBC 源码分析 —— SQL 解析(三)之查询SQL
- 数据库中间件 MyCAT源码分析:【单库单表】插入
- 数据库中间件 MyCAT源码分析:【单库单表】插入
- 数据库中间件MyCAT源码分析:【单库单表】插入
- 数据库中间件 Sharding-JDBC 源码分析 —— SQL 解析(三)之查询SQL
- 数据库中间件 MyCAT源码分析:【单库单表】插入
- 数据库中间件 MyCAT源码分析:【单库单表】插入
- 数据库中间件 MyCAT 源码分析 —— 调试环境搭建