mycat2.0源码分析02-客户端发送认证报文
2017-05-11 15:23
459 查看
mycat客户端发送认证报文
1 NIOReactor报文的异步读取和报文格式验证
11 NIOReactor处理读事件
12 Connection的asynRead读取报文
13 MySQLFrontConnectionHandler对报文格式的验证
2 认证报文的验证和响应
21 调用CheckUserLoginResponseCallback的processCmd方法对认证报文验证
22 调用CheckUserLoginResponseCallback的success方法响应
3 NIOReactor写事件处理
31 调用Connection的doWriteQueue方法向客户端写数据
上一篇文章已经介绍了mycat2.0接受客户端连接,mycat发送握手报文给客户端的过程。本篇我们介绍客户端在接收到mycat发送的握手报文之后,客户端发送给mycat的认证报文以及mycat对认证报文的相应过程。
当客户端向mycat发送认证报文的时候,selector.selectedKeys()就绪的读事件,判断selectionKey是有效且可读的,获取selectionKey绑定的Connection实例,调用Connection的asynRead()方法读取报文。读事件处理完成后需要手动删除selectionKey,不然还会重复触发读事件。
这里获取的Connection实际上它的子类MySQLFrontConnection,asynRead()是在Connection类中实现。asynRead()方法首先对Connetion的连接的状态进行了判断,如果连接已经关闭,那么不进行读取操作。
readDataBuffer在上一篇文章提到,是在NIOReactor的register()的方法中调用Connection的register()方法对readDataBuffer和writeDataBuffer进行了初始化,readDataBuffer的实现类是MappedFileConDataBuffer,writeDataBuffer的实现类是MappedFileConDataBuffer3,基于文件存储方式实现,将socket的数据读取并写入文件,提供后续流程的读取并进行处理。其他情况的处理还有待完善,如文件存储空间不够扩展的问题等。
此处的handler是MySQLFrontConnectionHandler,在使用工厂类MySQLFrontendConnectionFactory获取MySQLFrontConnection类的时候,在MySQLFrontConnection类中设置了全局的handler,代码如下。handler对报文数据进行后续的处理,下面将详解。
MySQLFrontConnectionHandler是一个全局类,handleReadEvent方法首先获取MySQLFrontConnection的readDataBuffer,获取需要处理的数据的区间,区间内的数据可能会有多个数据包。
第一步判断读取的数据区间字节数是否大于等于4(因为一个mysql包的包头长度是4字节),不合法的不做处理。
第二步读取mysql数据包的长度,包括包头的长度。如果数据区间的长度小于mysql数据包的长度,说明数据包不完整,也不进行处理。
第三步获取mysql的报文类型,对于认证报文,第5个字节是表示客户端能力的一部分。
最后调用CheckUserLoginResponseCallback的processCmd()方法进行认证校验操作。
MySQLFrontConnectionHandler的onConnected()方法中设置CheckUserLoginResponseCallback为连接的SQLCommandHandler。
processCmd()方法,对用户名等必要信息进行校验,目前校验代码没有实现完全。所有的信息认证没有问题,调用success()方法,进行认证响应报文的发送。
认证成功之后,判断认证包中是否指定了字符集,如果没有指定字符集则返回ERR报文。再根据认证报文中的数据库名来来设置SQLCommandHandler,下面将详细讲。
根据认证报文中设置的数据库名,来设置前段连接的schema,代码如下:
根据认证报文提供的数据名称来获取mycat配置的schema,判断是普通还是分片的schema,如果schema是一个普通schema,那么设置session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler()),否则设置为session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler())。
一切都通过之后,将OK报文写入到连接的writeDataBuffer中,关注写事件并唤醒selector。并将连接的状态设置为空闲状态。
Connection的doWriteQueue()方法:
Connection的write0()方法:
1 NIOReactor报文的异步读取和报文格式验证
11 NIOReactor处理读事件
12 Connection的asynRead读取报文
13 MySQLFrontConnectionHandler对报文格式的验证
2 认证报文的验证和响应
21 调用CheckUserLoginResponseCallback的processCmd方法对认证报文验证
22 调用CheckUserLoginResponseCallback的success方法响应
3 NIOReactor写事件处理
31 调用Connection的doWriteQueue方法向客户端写数据
2.mycat客户端发送认证报文
上一篇文章已经介绍了mycat2.0接受客户端连接,mycat发送握手报文给客户端的过程。本篇我们介绍客户端在接收到mycat发送的握手报文之后,客户端发送给mycat的认证报文以及mycat对认证报文的相应过程。
2.1 NIOReactor报文的异步读取和报文格式验证
上一篇文章已经介绍了NIOAcceptor接受客户端连接,并且NIOReactor负责初始化socketChannel并建立连接,向客户端发送握手报文,接着客户端会向mycat发送认证报文,由NIOReactor来处理认证报文的读取。下面看上图所示的1~3.2.1.1 NIOReactor处理读事件
@Override public void run() { final Selector selector = this.selector; Set<SelectionKey> keys = null; int readys=0; for (;;) { ++reactCount; try { readys=selector.select(400/(readys+1)); if(readys==0) { handlerEvents(selector); continue; } keys = selector.selectedKeys(); for (final SelectionKey key : keys) { Connection con = null; try { final Object att = key.attachment(); LOGGER.debug("select-key-readyOps = {}, attachment = {}", key.readyOps(), att); if (att != null && key.isValid()) { con = (Connection) att; if (key.isReadable()) { try { con.asynRead(); } catch (Throwable e) { if (!(e instanceof java.io.IOException)) { LOGGER.warn("caught err: " + con, e); } con.close("program err:" + e.toString()); continue; } } // "key" may be cancelled in asynRead()! // @author little-pan // @since 2016-09-29 if(key.isValid() == false){ LOGGER.debug("select-key cancelled"); continue; } if (key.isWritable()) { con.doWriteQueue(); } } else { key.cancel(); } } catch (final Throwable e) { if (e instanceof CancelledKeyException) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } else { LOGGER.warn(con + " " + e); } } } } catch (Throwable e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } } //handler other events handlerEvents(selector); } }
当客户端向mycat发送认证报文的时候,selector.selectedKeys()就绪的读事件,判断selectionKey是有效且可读的,获取selectionKey绑定的Connection实例,调用Connection的asynRead()方法读取报文。读事件处理完成后需要手动删除selectionKey,不然还会重复触发读事件。
2.1.2 Connection的asynRead()读取报文
/** * 异步读取数据,only nio thread call * * @throws IOException */ @SuppressWarnings("unchecked") protected void asynRead() throws IOException { final ConDataBuffer buffer = readDataBuffer; if(LOGGER.isDebugEnabled()){ LOGGER.debug("C#{}B#{} ready to read data", getId(), buffer.hashCode()); } if (isClosed()) { LOGGER.debug("Connection closed: ignore"); return; } final int got = buffer.transferFrom(channel); if(LOGGER.isDebugEnabled()){ LOGGER.debug("C#{}B#{} can read {} bytes", getId(), buffer.hashCode(), got); } switch (got) { case 0: { // 如果空间不够了,继续分配空间读取 if (readDataBuffer.isFull()) { // @todo extends } break; } case -1: { close("client closed"); break; } default: {// readed some bytes // trace network protocol stream final NetSystem nets = NetSystem.getInstance(); if(nets.getNetConfig().isTraceProtocol()){ final int offset = buffer.readPos(), length = buffer.writingPos() - offset; final String hexs= StringUtil.dumpAsHex(buffer, offset, length); LOGGER.info("C#{}B#{}: last readed = {} bytes, total readed = {} bytes, buffer bytes\n{}", getId(), buffer.hashCode(), got, length, hexs); } // 子类负责解析报文并处理 handler.handleReadEvent(this); } } }
这里获取的Connection实际上它的子类MySQLFrontConnection,asynRead()是在Connection类中实现。asynRead()方法首先对Connetion的连接的状态进行了判断,如果连接已经关闭,那么不进行读取操作。
readDataBuffer在上一篇文章提到,是在NIOReactor的register()的方法中调用Connection的register()方法对readDataBuffer和writeDataBuffer进行了初始化,readDataBuffer的实现类是MappedFileConDataBuffer,writeDataBuffer的实现类是MappedFileConDataBuffer3,基于文件存储方式实现,将socket的数据读取并写入文件,提供后续流程的读取并进行处理。其他情况的处理还有待完善,如文件存储空间不够扩展的问题等。
此处的handler是MySQLFrontConnectionHandler,在使用工厂类MySQLFrontendConnectionFactory获取MySQLFrontConnection类的时候,在MySQLFrontConnection类中设置了全局的handler,代码如下。handler对报文数据进行后续的处理,下面将详解。
public class MySQLFrontendConnectionFactory extends ConnectionFactory { /** * 全局共享的前段NIO Handler */ private final MySQLFrontConnectionHandler handler = new MySQLFrontConnectionHandler(); ...... @Override protected NIOHandler<MySQLFrontConnection> getNIOHandler() { return handler; } }
public abstract class ConnectionFactory { protected abstract Connection makeConnection(SocketChannel channel) throws IOException; protected abstract NIOHandler getNIOHandler(); public Connection make(SocketChannel channel) throws IOException { channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // 子类完成具体连接创建工作 Connection c = makeConnection(channel); // 设置连接的参数 NetSystem.getInstance().setSocketParams(c,true); // 设置NIOHandler c.setHandler(getNIOHandler()); return c; } }
2.1.3 MySQLFrontConnectionHandler对报文格式的验证
public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{ LOGGER.debug("handleReadEvent(): {}", cnxn); final ConDataBuffer buffer = cnxn.getReadDataBuffer(); int offset = buffer.readPos(), limit = buffer.writingPos(); // 读取到了包头和长度 // 是否讀完一個報文 for(; ; ) { if(!MySQLConnection.validateHeader(offset, limit)) { LOGGER.debug("C#{}B#{} validate protocol packet header: too short, ready to handle next the read event", cnxn.getId(), buffer.hashCode()); return; } int length = MySQLConnection.getPacketLength(buffer, offset); if((length + offset) > limit) { LOGGER.debug("C#{}B#{} nNot a whole packet: required length = {} bytes, cur total length = {} bytes, " + "ready to handle the next read event", cnxn.getId(), buffer.hashCode(), length, limit); return; } if(length == 4){ // @todo handle empty packet } // 解析报文类型 final byte packetType = buffer.getByte(offset + MySQLConnection.msyql_packetHeaderSize); final int pkgStartPos = offset; // trace-protocol-packet // @author little-pan // @since 2016-09-29 final NetSystem nets = NetSystem.getInstance(); if(nets.getNetConfig().isTraceProtocol()){ final String hexs = StringUtil.dumpAsHex(buffer, pkgStartPos, length); LOGGER.info("C#{}B#{} received a packet: offset = {}, length = {}, type = {}, cur total length = {}, packet bytes\n{}", cnxn.getId(), buffer.hashCode(), pkgStartPos, length, packetType, limit, hexs); } cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length); offset += length; buffer.setReadingPos(offset); } }
MySQLFrontConnectionHandler是一个全局类,handleReadEvent方法首先获取MySQLFrontConnection的readDataBuffer,获取需要处理的数据的区间,区间内的数据可能会有多个数据包。
MySQLConnection.validateHeader(offset, limit)
第一步判断读取的数据区间字节数是否大于等于4(因为一个mysql包的包头长度是4字节),不合法的不做处理。
MySQLConnection.getPacketLength(buffer, offset)
第二步读取mysql数据包的长度,包括包头的长度。如果数据区间的长度小于mysql数据包的长度,说明数据包不完整,也不进行处理。
第三步获取mysql的报文类型,对于认证报文,第5个字节是表示客户端能力的一部分。
cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length);
最后调用CheckUserLoginResponseCallback的processCmd()方法进行认证校验操作。
public class MySQLFrontConnectionHandler implements NIOHandler<MySQLFrontConnection> { private final CheckUserLoginResponseCallback loginCmdHandler = new CheckUserLoginResponseCallback(); @Override public void onConnected(MySQLFrontConnection con) throws IOException { LOGGER.debug("onConnected(): {}", con); con.getSession().changeCmdHandler(loginCmdHandler); con.sendAuthPackge(); }
MySQLFrontConnectionHandler的onConnected()方法中设置CheckUserLoginResponseCallback为连接的SQLCommandHandler。
2.2 认证报文的验证和响应
如上图4~8的过程。2.2.1 调用CheckUserLoginResponseCallback的processCmd()方法对认证报文验证
public void processCmd(MySQLFrontConnection con, ConDataBuffer dataBuffer, byte packageType, int pkgStartPos, int pkgLen) throws IOException { // check quit packet if (packageType == MySQLPacket.QUIT_PACKET) { con.close("quit packet"); return; } ByteBuffer byteBuff = dataBuffer.getBytes(pkgStartPos, pkgLen); AuthPacket auth = new AuthPacket(); auth.read(byteBuff); // Fake check user LOGGER.debug("Check user name. " + auth.user); if (!auth.user.equals("root")) { LOGGER.debug("User name error. " + auth.user); con.failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'"); return; } // Fake check password LOGGER.debug("Check user password. " + new String(auth.password)); // check schema LOGGER.debug("Check database. " + auth.database); success(con, auth); }
processCmd()方法,对用户名等必要信息进行校验,目前校验代码没有实现完全。所有的信息认证没有问题,调用success()方法,进行认证响应报文的发送。
2.2.2 调用CheckUserLoginResponseCallback的success()方法响应
private void success(MySQLFrontConnection con, AuthPacket auth) throws IOException { LOGGER.debug("Login success"); // 设置字符集编码 int charsetIndex = (auth.charsetIndex & 0xff); final String charset = CharsetUtil.getCharset(charsetIndex); if (charset == null) { final String errmsg = "Unknown charsetIndex:" + charsetIndex; LOGGER.warn(errmsg); con.writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, errmsg); return; } LOGGER.debug("charset = {}, charsetIndex = {}", charset, charsetIndex); con.setCharset(charsetIndex, charset); //认证成功后,修改changeCmdHandler,由CheckUserLoginResponseCallback改用 // AbstractSchemaSQLCommandHandler处理 if (!con.setFrontSchema(auth.database)) { final String errmsg = "No Mycat Schema defined: " + auth.database; LOGGER.debug(errmsg); con.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, errmsg); } else { con.write(AUTH_OK); } con.setState(Connection.STATE_IDLE); }
认证成功之后,判断认证包中是否指定了字符集,如果没有指定字符集则返回ERR报文。再根据认证报文中的数据库名来来设置SQLCommandHandler,下面将详细讲。
con.setFrontSchema(auth.database)
根据认证报文中设置的数据库名,来设置前段连接的schema,代码如下:
public boolean setFrontSchema(String schema) throws IOException { SchemaBean mycatSchema = null; if (schema != null) { mycatSchema = SQLEngineCtx.INSTANCE().getMycatSchema(schema); if (mycatSchema == null) { return false; } this.mycatSchema = mycatSchema; if (!mycatSchema.isNormalSchema()) { this.session.changeCmdHandler(SQLEngineCtx.INSTANCE().getPartionSchemaSQLCmdHandler()); }else { session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler()); } return true; }else { // 默认设置为不分片的SQL处理器 session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler()); return true; } }
根据认证报文提供的数据名称来获取mycat配置的schema,判断是普通还是分片的schema,如果schema是一个普通schema,那么设置session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler()),否则设置为session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler())。
} else { con.write(AUTH_OK); } con.setState(Connection.STATE_IDLE);
一切都通过之后,将OK报文写入到连接的writeDataBuffer中,关注写事件并唤醒selector。并将连接的状态设置为空闲状态。
2.3 NIOReactor写事件处理
NIOReactor类的RWThread类中调用con.doWriteQueue()进行异步的写操作。如上图10~13的过程。2.3.1 调用Connection的doWriteQueue()方法向客户端写数据
如上图14~17,NIOReactor的RWThread的run中,当有写事件触发时,调用doWriteQueue()方法向客户端写数据。@Override public void run() { final Selector selector = this.selector; Set<SelectionKey> keys = null; int readys=0; for (;;) { ++reactCount; try { readys=selector.select(400/(readys+1)); if(readys==0) { handlerEvents(selector); continue; } keys = selector.selectedKeys(); for (final SelectionKey key : keys) { Connection con = null; try { final Object att = key.attachment(); LOGGER.debug("select-key-readyOps = {}, attachment = {}", key.readyOps(), att); if (att != null && key.isValid()) { con = (Connection) att; if (key.isReadable()) { try { con.asynRead(); } catch (Throwable e) { if (!(e instanceof java.io.IOException)) { LOGGER.warn("caught err: " + con, e); } con.close("program err:" + e.toString()); continue; } } // "key" may be cancelled in asynRead()! // @author little-pan // @since 2016-09-29 if(key.isValid() == false){ LOGGER.debug("select-key cancelled"); continue; } if (key.isWritable()) { con.doWriteQueue(); } } else { key.cancel(); } } catch (final Throwable e) { if (e instanceof CancelledKeyException) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } else { LOGGER.warn(con + " " + e); } } } } catch (Throwable e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } } //handler other events handlerEvents(selector); } }
Connection的doWriteQueue()方法:
public void doWriteQueue() { try { boolean noMoreData = write0(); lastWriteTime = TimeUtil.currentTimeMillis(); if (noMoreData) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("caught err:", e); } close("err:" + e); } }
Connection的write0()方法:
private boolean write0() throws IOException { final NetSystem nets = NetSystem.getInstance(); final ConDataBuffer buffer = this.writeDataBuffer; final int written = buffer.transferTo(this.channel); final int remains = buffer.writingPos() - buffer.readPos(); netOutBytes += written; nets.addNetOutBytes(written); // trace-protocol // @author little-pan // @since 2016-09-29 if(nets.getNetConfig().isTraceProtocol()){ final String hexs = StringUtil.dumpAsHex(buffer, buffer.readPos() - written, written); LOGGER.info("C#{}B#{}: last writed = {} bytes, remain to write = {} bytes, written bytes\n{}", getId(), buffer.hashCode(), written, remains, hexs); } return (remains == 0); }
相关文章推荐
- mycat2.0源码分析01-接受客户端连接并发送握手报文
- 01_mycat1.6源码_mycat接受客户端连接并发送握手报文
- 【Linux4.1.12源码分析】二层报文发送之报文GSO分段(MAC层)
- 【Linux4.1.12源码分析】二层报文发送之报文GSO分段(skb_segment)
- 【Linux4.1.12源码分析】vxlan报文发送之udp_tunnel_xmit_skb
- 【Linux4.1.12源码分析】IP层报文发送之ip_output
- 【Linux4.1.12源码分析】二层报文发送之dev_queue_xmit
- 【Linux4.1.12源码分析】二层报文发送之qdisc实现分析
- 源码分析mycat1.6之mysql通信协议篇之COM_QUERY(SELECT语句报文解析)
- 【Linux4.1.12源码分析】二层报文发送之报文GSO分段(TCP)
- 【Linux4.1.12源码分析】IP层报文发送之ip_local_out
- 【Linux4.1.12源码分析】二层报文发送之报文GSO分段(IP层)
- 【Linux4.1.12源码分析】二层报文发送之报文GSO分段(UDP)
- 神州数码私有报文分析完毕,庆祝一下[神州数码认证客户端破解]
- 【Linux4.1.12源码分析】IP层报文发送之ip_local_out
- 【Linux4.1.12源码分析】vxlan报文发送之iptunnel_xmit
- 【Linux4.1.12源码分析】二层报文发送之GSO条件判断
- java集合框架02——Collection架构与源码分析
- vue2源码浏览分析02
- bVNC 客户端源码分析(android)