您的位置:首页 > 数据库

数据库中间件MyCAT源码分析:【单库单表】查询

2017-05-30 20:41 691 查看
title: MyCAT 源码分析 —— 【单库单表】查询

date: 2017-05-30

tags:

categories: MyCAT

permalink: MyCAT/single-db-single-table-select



������关注微信公众号:【芋艿的后端小屋】有福利:

1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

4. 新的源码解析文章实时收到通知。每周更新一篇左右

1. 概述

2. 接收请求,解析 SQL

3. 获得路由结果

4. 获得 MySQL 连接,执行 SQL

5. 响应执行 SQL 结果

6. 其他 :更新 / 删除

1. 概述

内容形态以 顺序图 + 核心代码 为主。

如果有地方表述不错误或者不清晰,欢迎留言。

对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。

微信号:wangwenbin-server。

本文讲解 【单库单表】查询 所涉及到的代码。

��内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 �� 标记差异的逻辑。

交互如下图:



整个过程,MyCAT Server 流程如下:

接收 MySQL Client 请求,解析 SQL。

获得路由结果,进行路由。

获得 MySQL 连接,执行 SQL。

响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL



【1 - 2】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【3】

1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
2: public class FrontendCommandHandler implements NIOHandler {
3:
4:     @Override
5:     public void handle(byte[] data) {
6:
7:         // .... 省略部分代码
8:         switch (data[4]) //
9:         {
10:             case MySQLPacket.COM_INIT_DB:
11:                 commands.doInitDB();
12:                 source.initDB(data);
13:                 break;
14:             case MySQLPacket.COM_QUERY: // 查询命令
15:                 // 计数查询命令
16:                 commands.doQuery();
17:                 // 执行查询命令
18:                 source.query(data);
19:                 break;
20:             case MySQLPacket.COM_PING:
21:                 commands.doPing();
22:                 source.ping();
23:                 break;
24:             // .... 省略部分case
25:         }
26:     }
27:
28: }


INSERT
/
SELECT
/
UPDATE
/
DELETE
等 SQL 归属于
MySQLPacket.COM_QUERY
,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》

【4】

将 二进制数组 解析成 SQL。核心代码如下:

1: // ⬇️⬇️⬇️【FrontendConnection.java】
2: public void query(byte[] data) {
3:    // 取得语句
4:    String sql = null;
5:    try {
6:        MySQLMessage mm = new MySQLMessage(data);
7:        mm.position(5);
8:        sql = mm.readString(charset);
9:    } catch (UnsupportedEncodingException e) {
10:        writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11:        return;
12:    }
13:    // 执行语句
14:    this.query( sql );
15: }


【5】

解析 SQL 类型。核心代码如下:

1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
2: @Override
3: public void query(String sql) {
4:    // 解析 SQL 类型
5:    int rs = ServerParse.parse(sql);
6:    int sqlType = rs & 0xff;
7:
8:    switch (sqlType) {
9:    //explain sql
10:    case ServerParse.EXPLAIN:
11:        ExplainHandler.handle(sql, c, rs >>> 8);
12:        break;
13:    // .... 省略部分case
14:        break;
15:    case ServerParse.SELECT:
16:        SelectHandler.handle(sql, c, rs >>> 8);
17:        break;
18:    // .... 省略部分case
19:    default:
20:        if(readOnly){
21:            LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22:            c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23:            break;
24:        }
25:        c.execute(sql, rs & 0xff);
26:    }
27: }
28:
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32:    int length = stmt.length();
33:    //FIX BUG FOR SQL SUCH AS /XXXX/SQL
34:    int rt = -1;
35:    for (int i = 0; i < length; ++i) {
36:        switch (stmt.charAt(i)) {
37:        // .... 省略部分case            case 'I':
38:        case 'i':
39:            rt = insertCheck(stmt, i);
40:            if (rt != OTHER) {
41:                return rt;
42:            }
43:            continue;
44:            // .... 省略部分case
45:        case 'S':
46:        case 's':
47:            rt = sCheck(stmt, i);
48:            if (rt != OTHER) {
49:                return rt;
50:            }
51:            continue;
52:            // .... 省略部分case
53:        default:
54:            continue;
55:        }
56:    }
57:    return OTHER;
58: }


��【6】【7】

解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:

1: // ⬇️⬇️⬇️【SelectHandler.java】
2: public static void handle(String stmt, ServerConnection c, int offs) {
3:    int offset = offs;
4:    switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型
5:    case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
6:        SelectVersionComment.response(c);
7:        break;
8:    case ServerParseSelect.DATABASE: // select DATABASE();
9:        SelectDatabase.response(c);
10:        break;
11:    case ServerParseSelect.USER: // select CURRENT_USER();
12:         SelectUser.response(c);
13:        break;
14:    case ServerParseSelect.VERSION: // select VERSION();
15:        SelectVersion.response(c);
16:        break;
17:    case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
18:        SessionIncrement.response(c);
19:        break;
20:    case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
21:        SessionIsolation.response(c);
22:        break;
23:    case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
24:        // ....省略代码
25:        break;
26:    case ServerParseSelect.IDENTITY: // select @@identity
27:        // ....省略代码
28:        break;
29:     case ServerParseSelect.SELECT_VAR_ALL: //
30:         SelectVariables.execute(c,stmt);
31:             break;
32:     case ServerParseSelect.SESSION_TX_READ_ONLY: //
33:         SelectTxReadOnly.response(c);
34:            break;
35:    default: // 其他,例如 select * from table
36:        c.execute(stmt, ServerParse.SELECT);
37:    }
38: }
39: // ⬇️⬇️⬇️【ServerParseSelect.java】
40: public static int parse(String stmt, int offset) {
41:    int i = offset;
42:    for (; i < stmt.length(); ++i) {
43:        switch (stmt.charAt(i)) {
44:        case ' ':
45:            continue;
46:        case '/':
47:        case '#':
48:            i = ParseUtil.comment(stmt, i);
49:            continue;
50:        case '@':
51:            return select2Check(stmt, i);
52:        case 'D':
53:        case 'd':
54:            return databaseCheck(stmt, i);
55:        case 'L':
56:        case 'l':
57:            return lastInsertCheck(stmt, i);
58:        case 'U':
59:        case 'u':
60:            return userCheck(stmt, i);
61:        case 'C':
62:        case 'c':
63:            return currentUserCheck(stmt, i);
64:        case 'V':
65:        case 'v':
66:            return versionCheck(stmt, i);
67:        default:
68:            return OTHER;
69:        }
70:    }
71:    return OTHER;
72: }


【8】

执行 SQL,详细解析见下文,核心代码如下:

1: // ⬇️⬇️⬇️【ServerConnection.java】
2: public class ServerConnection extends FrontendConnection {
3:    public void execute(String sql, int type) {
4:        // .... 省略代码
5:        SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
6:        if (schema == null) {
7:            writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
8:                    "Unknown MyCAT Database '" + db + "'");
9:            return;
10:        }
11:
12:        // .... 省略代码
13:
14:        // 路由到后端数据库,执行 SQL
15:        routeEndExecuteSQL(sql, type, schema);
16:    }
17:
18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19:        // 路由计算
20:        RouteResultset rrs = null;
21:        try {
22:            rrs = MycatServer

f076
23:                    .getInstance()
24:                    .getRouterservice()
25:                    .route(MycatServer.getInstance().getConfig().getSystem(),
26:                            schema, type, sql, this.charset, this);
27:
28:        } catch (Exception e) {
29:            StringBuilder s = new StringBuilder();
30:            LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31:            String msg = e.getMessage();
32:            writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33:            return;
34:        }
35:
36:        // 执行 SQL
37:        if (rrs != null) {
38:            // session执行
39:            session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40:        }
41:
42:    }
43:
44: }


3. 获得路由结果



【 1 - 5 】

获得路由主流程。核心代码如下:

1: // ⬇️⬇️⬇️【SelectHandler.java】
2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
3:        int sqlType, String stmt, String charset, ServerConnection sc)
4:        throws SQLNonTransientException {
5:    RouteResultset rrs = null;
6:
7:    // SELECT 类型的SQL, 检测缓存是否存在
8:    if (sqlType == ServerParse.SELECT) {
9:        cacheKey = schema.getName() + stmt;
10:        rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
11:        if (rrs != null) {
12:            checkMigrateRule(schema.getName(),rrs,sqlType);
13:            return rrs;
14:            }
15:        }
16:    }
17:
18:    // .... 省略代码
19:    int hintLength = RouteService.isHintSql(stmt);
20:    if(hintLength != -1){ // TODO 待读:hint
21:        // .... 省略代码
22:        }
23:    } else {
24:        stmt = stmt.trim();
25:        rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
26:                charset, sc, tableId2DataNodeCache);
27:    }
28:
29:    // 记录查询命令路由结果缓存
30:    if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
31:        sqlRouteCache.putIfAbsent(cacheKey, rrs);
32:    }
33:    // .... 省略代码        return rrs;
34: }
35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
36: @Override
37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
38:        String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
39:
40:    // .... 省略代码
41:
42:    // 处理一些路由之前的逻辑;全局序列号,父子表插入
43:    if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
44:        return null;
45:    }
46:
47:    // .... 省略代码
48:
49:    // 检查是否有分片
50:    if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
51:        rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
52:    } else {
53:        RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
54:        if (returnedSet == null) {
55:            rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
56:        }
57:    }
58:
59:    return rrs;
60: }


��【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。

��【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

4. 获得 MySQL 连接,执行 SQL



【 1 - 8 】

获得 MySQL 连接。

PhysicalDBNode :物理数据库节点。

PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

�� 5. 响应执行 SQL 结果



核心代码如下:

1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】
2: @Override
3: protected void handleData(byte[] data) {
4:    switch (resultStatus) {
5:    case RESULT_STATUS_INIT:
6:        switch (data[4]) {
7:        case OkPacket.FIELD_COUNT:
8:            handleOkPacket(data);
9:            break;
10:        case ErrorPacket.FIELD_COUNT:
11:            handleErrorPacket(data);
12:            break;
13:        case RequestFilePacket.FIELD_COUNT:
14:            handleRequestPacket(data);
15:            break;
16:        default: // 初始化 header fields
17:            resultStatus = RESULT_STATUS_HEADER;
18:            header = data;
19:            fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
20:                    4));
21:        }
22:        break;
23:    case RESULT_STATUS_HEADER:
24:        switch (data[4]) {
25:        case ErrorPacket.FIELD_COUNT:
26:            resultStatus = RESULT_STATUS_INIT;
27:            handleErrorPacket(data);
28:            break;
29:        case EOFPacket.FIELD_COUNT: // 解析 fields 结束
30:            resultStatus = RESULT_STATUS_FIELD_EOF;
31:            handleFieldEofPacket(data);
32:            break;
33:        default: // 解析 fields
34:            fields.add(data);
35:        }
36:        break;
37:    case RESULT_STATUS_FIELD_EOF:
38:        switch (data[4]) {
39:        case ErrorPacket.FIELD_COUNT:
40:            resultStatus = RESULT_STATUS_INIT;
41:            handleErrorPacket(data);
42:            break;
43:        case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束
44:            resultStatus = RESULT_STATUS_INIT;
45:            handleRowEofPacket(data);
46:            break;
47:        default: // 每行记录
48:            handleRowPacket(data);
49:        }
50:        break;
51:    default:
52:        throw new RuntimeException("unknown status!");
53:    }
54: }


6. 其他 :更新 / 删除

流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息