您的位置:首页 > 其它

mycat2.0源码分析02-客户端发送认证报文

2017-05-11 15:23 459 查看
mycat客户端发送认证报文

1 NIOReactor报文的异步读取和报文格式验证
11 NIOReactor处理读事件

12 Connection的asynRead读取报文

13 MySQLFrontConnectionHandler对报文格式的验证

2 认证报文的验证和响应
21 调用CheckUserLoginResponseCallback的processCmd方法对认证报文验证

22 调用CheckUserLoginResponseCallback的success方法响应

3 NIOReactor写事件处理
31 调用Connection的doWriteQueue方法向客户端写数据

2.mycat客户端发送认证报文



上一篇文章已经介绍了mycat2.0接受客户端连接,mycat发送握手报文给客户端的过程。本篇我们介绍客户端在接收到mycat发送的握手报文之后,客户端发送给mycat的认证报文以及mycat对认证报文的相应过程。

2.1 NIOReactor报文的异步读取和报文格式验证

上一篇文章已经介绍了NIOAcceptor接受客户端连接,并且NIOReactor负责初始化socketChannel并建立连接,向客户端发送握手报文,接着客户端会向mycat发送认证报文,由NIOReactor来处理认证报文的读取。下面看上图所示的1~3.

2.1.1 NIOReactor处理读事件

@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
int readys=0;
for (;;) {
++reactCount;
try {
readys=selector.select(400/(readys+1));
if(readys==0)
{
handlerEvents(selector);
continue;
}
keys = selector.selectedKeys();
for (final SelectionKey key : keys) {
Connection con = null;
try {
final Object att = key.attachment();
LOGGER.debug("select-key-readyOps = {}, attachment = {}", key.readyOps(), att);
if (att != null && key.isValid()) {
con = (Connection) att;
if (key.isReadable()) {
try {
con.asynRead();
} catch (Throwable e) {
if (!(e instanceof java.io.IOException)) {
LOGGER.warn("caught err: " + con, e);
}
con.close("program err:" + e.toString());
continue;
}
}
// "key" may be cancelled in asynRead()!
// @author little-pan
// @since 2016-09-29
if(key.isValid() == false){
LOGGER.debug("select-key cancelled");
continue;
}
if (key.isWritable()) {
con.doWriteQueue();
}
} else {
key.cancel();
}
} catch (final Throwable e) {
if (e instanceof CancelledKeyException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} else {
LOGGER.warn(con + " " + e);
}

}

}

} catch (Throwable e) {
LOGGER.warn(name, e);
} finally {
if (keys != null) {
keys.clear();
}
}
//handler other events
handlerEvents(selector);
}
}


当客户端向mycat发送认证报文的时候,selector.selectedKeys()就绪的读事件,判断selectionKey是有效且可读的,获取selectionKey绑定的Connection实例,调用Connection的asynRead()方法读取报文。读事件处理完成后需要手动删除selectionKey,不然还会重复触发读事件。

2.1.2 Connection的asynRead()读取报文

/**
* 异步读取数据,only nio thread call
*
* @throws IOException
*/
@SuppressWarnings("unchecked")
protected void asynRead() throws IOException {
final ConDataBuffer buffer = readDataBuffer;
if(LOGGER.isDebugEnabled()){
LOGGER.debug("C#{}B#{} ready to read data", getId(), buffer.hashCode());
}
if (isClosed()) {
LOGGER.debug("Connection closed: ignore");
return;
}
final int got =  buffer.transferFrom(channel);
if(LOGGER.isDebugEnabled()){
LOGGER.debug("C#{}B#{} can read {} bytes", getId(), buffer.hashCode(), got);
}
switch (got) {
case 0: {
// 如果空间不够了,继续分配空间读取
if (readDataBuffer.isFull()) {
// @todo extends
}
break;
}
case -1: {
close("client closed");
break;
}
default: {// readed some bytes
// trace network protocol stream
final NetSystem nets = NetSystem.getInstance();
if(nets.getNetConfig().isTraceProtocol()){
final int offset = buffer.readPos(), length = buffer.writingPos() - offset;
final String hexs= StringUtil.dumpAsHex(buffer, offset, length);
LOGGER.info("C#{}B#{}: last readed = {} bytes, total readed = {} bytes, buffer bytes\n{}",
getId(), buffer.hashCode(), got, length, hexs);
}
// 子类负责解析报文并处理
handler.handleReadEvent(this);
}
}
}


这里获取的Connection实际上它的子类MySQLFrontConnection,asynRead()是在Connection类中实现。asynRead()方法首先对Connetion的连接的状态进行了判断,如果连接已经关闭,那么不进行读取操作。

readDataBuffer在上一篇文章提到,是在NIOReactor的register()的方法中调用Connection的register()方法对readDataBuffer和writeDataBuffer进行了初始化,readDataBuffer的实现类是MappedFileConDataBuffer,writeDataBuffer的实现类是MappedFileConDataBuffer3,基于文件存储方式实现,将socket的数据读取并写入文件,提供后续流程的读取并进行处理。其他情况的处理还有待完善,如文件存储空间不够扩展的问题等。

此处的handler是MySQLFrontConnectionHandler,在使用工厂类MySQLFrontendConnectionFactory获取MySQLFrontConnection类的时候,在MySQLFrontConnection类中设置了全局的handler,代码如下。handler对报文数据进行后续的处理,下面将详解。

public class MySQLFrontendConnectionFactory extends ConnectionFactory {
/**
* 全局共享的前段NIO Handler
*/
private final MySQLFrontConnectionHandler handler = new MySQLFrontConnectionHandler();
......
@Override
protected NIOHandler<MySQLFrontConnection> getNIOHandler() {
return handler;
}
}


public abstract class ConnectionFactory {

protected abstract Connection makeConnection(SocketChannel channel)
throws IOException;

protected abstract NIOHandler getNIOHandler();

public Connection make(SocketChannel channel) throws IOException {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// 子类完成具体连接创建工作
Connection c = makeConnection(channel);
// 设置连接的参数
NetSystem.getInstance().setSocketParams(c,true);
// 设置NIOHandler
c.setHandler(getNIOHandler());
return c;
}
}


2.1.3 MySQLFrontConnectionHandler对报文格式的验证

public void handleReadEvent(final MySQLFrontConnection cnxn) throws IOException{
LOGGER.debug("handleReadEvent(): {}", cnxn);
final ConDataBuffer buffer = cnxn.getReadDataBuffer();
int offset = buffer.readPos(), limit = buffer.writingPos();
// 读取到了包头和长度
// 是否讀完一個報文
for(; ; ) {
if(!MySQLConnection.validateHeader(offset, limit)) {
LOGGER.debug("C#{}B#{} validate protocol packet header: too short, ready to handle next the read event",
cnxn.getId(), buffer.hashCode());
return;
}
int length = MySQLConnection.getPacketLength(buffer, offset);
if((length + offset) > limit) {
LOGGER.debug("C#{}B#{} nNot a whole packet: required length = {} bytes, cur total length = {} bytes, "
+ "ready to handle the next read event", cnxn.getId(), buffer.hashCode(), length, limit);
return;
}
if(length == 4){
// @todo handle empty packet
}
// 解析报文类型
final byte packetType = buffer.getByte(offset + MySQLConnection.msyql_packetHeaderSize);
final int pkgStartPos = offset;

// trace-protocol-packet
// @author little-pan
// @since 2016-09-29
final NetSystem nets = NetSystem.getInstance();
if(nets.getNetConfig().isTraceProtocol()){
final String hexs = StringUtil.dumpAsHex(buffer, pkgStartPos, length);
LOGGER.info("C#{}B#{} received a packet: offset = {}, length = {}, type = {}, cur total length = {}, packet bytes\n{}",
cnxn.getId(), buffer.hashCode(), pkgStartPos, length, packetType, limit, hexs);
}
cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length);

offset += length;
buffer.setReadingPos(offset);

}
}


MySQLFrontConnectionHandler是一个全局类,handleReadEvent方法首先获取MySQLFrontConnection的readDataBuffer,获取需要处理的数据的区间,区间内的数据可能会有多个数据包。

MySQLConnection.validateHeader(offset, limit)


第一步判断读取的数据区间字节数是否大于等于4(因为一个mysql包的包头长度是4字节),不合法的不做处理。

MySQLConnection.getPacketLength(buffer, offset)


第二步读取mysql数据包的长度,包括包头的长度。如果数据区间的长度小于mysql数据包的长度,说明数据包不完整,也不进行处理。

第三步获取mysql的报文类型,对于认证报文,第5个字节是表示客户端能力的一部分。

cnxn.getSession().getCurCmdHandler().processCmd(cnxn, buffer, packetType, pkgStartPos, length);


最后调用CheckUserLoginResponseCallback的processCmd()方法进行认证校验操作。

public class MySQLFrontConnectionHandler implements NIOHandler<MySQLFrontConnection> {

private final CheckUserLoginResponseCallback loginCmdHandler = new CheckUserLoginResponseCallback();

@Override
public void onConnected(MySQLFrontConnection con) throws IOException {
LOGGER.debug("onConnected(): {}", con);
con.getSession().changeCmdHandler(loginCmdHandler);
con.sendAuthPackge();
}


MySQLFrontConnectionHandler的onConnected()方法中设置CheckUserLoginResponseCallback为连接的SQLCommandHandler。

2.2 认证报文的验证和响应

如上图4~8的过程。

2.2.1 调用CheckUserLoginResponseCallback的processCmd()方法对认证报文验证

public void processCmd(MySQLFrontConnection con, ConDataBuffer dataBuffer, byte packageType, int pkgStartPos,
int pkgLen) throws IOException {
// check quit packet
if (packageType == MySQLPacket.QUIT_PACKET) {
con.close("quit packet");
return;
}
ByteBuffer byteBuff = dataBuffer.getBytes(pkgStartPos, pkgLen);
AuthPacket auth = new AuthPacket();
auth.read(byteBuff);

// Fake check user
LOGGER.debug("Check user name. " + auth.user);
if (!auth.user.equals("root")) {
LOGGER.debug("User name error. " + auth.user);
con.failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'");
return;
}

// Fake check password
LOGGER.debug("Check user password. " + new String(auth.password));

// check schema
LOGGER.debug("Check database. " + auth.database);

success(con, auth);

}


processCmd()方法,对用户名等必要信息进行校验,目前校验代码没有实现完全。所有的信息认证没有问题,调用success()方法,进行认证响应报文的发送。

2.2.2 调用CheckUserLoginResponseCallback的success()方法响应

private void success(MySQLFrontConnection con, AuthPacket auth) throws IOException {
LOGGER.debug("Login success");
// 设置字符集编码
int charsetIndex = (auth.charsetIndex & 0xff);
final String charset = CharsetUtil.getCharset(charsetIndex);
if (charset == null) {
final String errmsg = "Unknown charsetIndex:" + charsetIndex;
LOGGER.warn(errmsg);
con.writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, errmsg);
return;
}
LOGGER.debug("charset = {}, charsetIndex = {}", charset, charsetIndex);
con.setCharset(charsetIndex, charset);

//认证成功后,修改changeCmdHandler,由CheckUserLoginResponseCallback改用
// AbstractSchemaSQLCommandHandler处理
if (!con.setFrontSchema(auth.database)) {
final String errmsg = "No Mycat Schema defined: " + auth.database;
LOGGER.debug(errmsg);
con.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, errmsg);
} else {
con.write(AUTH_OK);
}
con.setState(Connection.STATE_IDLE);

}


认证成功之后,判断认证包中是否指定了字符集,如果没有指定字符集则返回ERR报文。再根据认证报文中的数据库名来来设置SQLCommandHandler,下面将详细讲。

con.setFrontSchema(auth.database)


根据认证报文中设置的数据库名,来设置前段连接的schema,代码如下:

public boolean setFrontSchema(String schema) throws IOException {
SchemaBean mycatSchema = null;
if (schema != null) {
mycatSchema = SQLEngineCtx.INSTANCE().getMycatSchema(schema);
if (mycatSchema == null) {

return false;
}
this.mycatSchema = mycatSchema;
if (!mycatSchema.isNormalSchema()) {
this.session.changeCmdHandler(SQLEngineCtx.INSTANCE().getPartionSchemaSQLCmdHandler());
}else
{
session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler());
}
return true;
}else
{
// 默认设置为不分片的SQL处理器
session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler());
return true;
}
}


根据认证报文提供的数据名称来获取mycat配置的schema,判断是普通还是分片的schema,如果schema是一个普通schema,那么设置session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler()),否则设置为session.changeCmdHandler(SQLEngineCtx.INSTANCE().getNomalSchemaSQLCmdHandler())

} else {
con.write(AUTH_OK);
}
con.setState(Connection.STATE_IDLE);


一切都通过之后,将OK报文写入到连接的writeDataBuffer中,关注写事件并唤醒selector。并将连接的状态设置为空闲状态。

2.3 NIOReactor写事件处理

NIOReactor类的RWThread类中调用con.doWriteQueue()进行异步的写操作。如上图10~13的过程。

2.3.1 调用Connection的doWriteQueue()方法向客户端写数据

如上图14~17,NIOReactor的RWThread的run中,当有写事件触发时,调用doWriteQueue()方法向客户端写数据。

@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
int readys=0;
for (;;) {
++reactCount;
try {
readys=selector.select(400/(readys+1));
if(readys==0)
{
handlerEvents(selector);
continue;
}
keys = selector.selectedKeys();
for (final SelectionKey key : keys) {
Connection con = null;
try {
final Object att = key.attachment();
LOGGER.debug("select-key-readyOps = {}, attachment = {}", key.readyOps(), att);
if (att != null && key.isValid()) {
con = (Connection) att;
if (key.isReadable()) {
try {
con.asynRead();
} catch (Throwable e) {
if (!(e instanceof java.io.IOException)) {
LOGGER.warn("caught err: " + con, e);
}
con.close("program err:" + e.toString());
continue;
}
}
// "key" may be cancelled in asynRead()!
// @author little-pan
// @since 2016-09-29
if(key.isValid() == false){
LOGGER.debug("select-key cancelled");
continue;
}
if (key.isWritable()) {
con.doWriteQueue();
}
} else {
key.cancel();
}
} catch (final Throwable e) {
if (e instanceof CancelledKeyException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} else {
LOGGER.warn(con + " " + e);
}

}

}

} catch (Throwable e) {
LOGGER.warn(name, e);
} finally {
if (keys != null) {
keys.clear();
}
}
//handler other events
handlerEvents(selector);
}
}


Connection的doWriteQueue()方法:

public void doWriteQueue() {
try {
boolean noMoreData = write0();
lastWriteTime = TimeUtil.currentTimeMillis();
if (noMoreData) {
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
disableWrite();
}

} else {

if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
enableWrite(false);
}
}

} catch (IOException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("caught err:", e);
}
close("err:" + e);
}

}


Connection的write0()方法:

private boolean write0() throws IOException {
final NetSystem nets = NetSystem.getInstance();
final ConDataBuffer buffer = this.writeDataBuffer;
final int written = buffer.transferTo(this.channel);
final int remains = buffer.writingPos() - buffer.readPos();
netOutBytes += written;
nets.addNetOutBytes(written);
// trace-protocol
// @author little-pan
// @since 2016-09-29
if(nets.getNetConfig().isTraceProtocol()){
final String hexs = StringUtil.dumpAsHex(buffer, buffer.readPos() - written, written);
LOGGER.info("C#{}B#{}: last writed = {} bytes, remain to write = {} bytes, written bytes\n{}",
getId(), buffer.hashCode(), written, remains, hexs);
}
return (remains == 0);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mycat