您的位置:首页 > 理论基础 > 计算机网络

【Zookeeper】源码分析之网络通信(二)之NIOServerCnxn

2017-03-01 17:22 429 查看
一、前言

  前面介绍了ServerCnxn,下面开始学习NIOServerCnxn。

二、NIOServerCnxn源码分析

  2.1 类的继承关系

public class NIOServerCnxn extends ServerCnxn {}


  说明:NIOServerCnxn继承了ServerCnxn抽象类,使用NIO来处理与客户端之间的通信,使用单线程处理。

  2.2 类的内部类

  1. SendBufferWriter类 

void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) { // socket未开启
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));

return;
}
if (k.isReadable()) { // key可读
// 将内容从socket写入incoming缓冲
int rc = sock.read(incomingBuffer);
if (rc < 0) { // 流结束异常,无法从客户端读取数据
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) { // 缓冲区已经写满
boolean isPayload;
// 读取下个请求
if (incomingBuffer == lenBuffer) { // start of next request
// 翻转缓冲区,可读
incomingBuffer.flip();
// 读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false
isPayload = readLength(k);
// 清除缓冲
incomingBuffer.clear();
} else { // 不等,因为在readLength中根据Len已经重新分配了incomingBuffer
// continuation
isPayload = true;
}
if (isPayload) { // 不为四个字母,为实际内容    // not the case for 4letterword
// 读取内容
readPayload();
}
else { // 四个字母,为四字母的命令
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) { // key可写
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
// "outgoingBuffers.size() = " +
// outgoingBuffers.size());
if (outgoingBuffers.size() > 0) {
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
// "sk " + k + " is valid: " +
// k.isValid());

/*
* This is going to reset the buffer position to 0 and the
* limit to the size of the buffer, so that we can fill it
* with data from the non-direct buffers that we need to
* send.
*/
// 分配的直接缓冲
ByteBuffer directBuffer = factory.directBuffer;
// 清除缓冲
directBuffer.clear();

for (ByteBuffer b : outgoingBuffers) { // 遍历
if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空闲长度小于b的剩余空闲长度
/*
* When we call put later, if the directBuffer is to
* small to hold everything, nothing will be copied,
* so we've got to slice the buffer if it's too big.
*/
// 缩小缓冲至directBuffer的大小
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position of
* the source buffers (we'll do that after the send, if
* needed), so we save and reset the position after the
* copy
*/
// 记录b的当前position
int p = b.position();
// 将b写入directBuffer
directBuffer.put(b);
// 设置回b的原来的position
b.position(p);
if (directBuffer.remaining() == 0) { // 已经写满
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
// 翻转缓冲区,可读
directBuffer.flip();

// 将directBuffer的内容写入socket
int sent = sock.write(directBuffer);
ByteBuffer bb;

// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) { // outgoingBuffers中还存在Buffer
// 取队首元素,但并不移出
bb = outgoingBuffers.peek();
if (bb == ServerCnxnFactory.closeConn) { // 关闭连接,抛出异常
throw new CloseRequestException("close requested");
}

// bb还剩余多少元素没有被发送
int left = bb.remaining() - sent;
if (left > 0) { // 存在元素未被发送
/*
* We only partially sent this buffer, so we update
* the position and exit the loop.
*/
// 更新bb的position
bb.position(bb.position() + sent);
break;
}
// 发送包,调用ServerCnxn方法
packetSent();
/* We've sent the whole buffer, so drop the buffer */
// 已经发送完buffer的所有内容,移除buffer
sent -= bb.remaining();
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
// outgoingBuffers.size() = " + outgoingBuffers.size());
}

synchronized(this.factory){ // 同步块
if (outgoingBuffers.size() == 0) { // outgoingBuffers不存在buffer
if (!initialized
&& (sk.interestOps() & SelectionKey.OP_READ) == 0) { // 未初始化并且无读请求
throw new CloseRequestException("responded to info probe");
}
// 重置感兴趣的集合
sk.interestOps(sk.interestOps()
& (~SelectionKey.OP_WRITE));
} else { // 重置感兴趣的集合
sk.interestOps(sk.interestOps()
| SelectionKey.OP_WRITE);
}
}
}
} catch (CancelledKeyException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId)
+ " due to " + e);
if (LOG.isDebugEnabled()) {
LOG.debug("CancelledKeyException stack trace", e);
}
close();
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn("caught end of stream exception",e); // tell user why

// expecting close to log session closure
close();
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId)
+ " due to " + e);
if (LOG.isDebugEnabled()) {
LOG.debug("IOException stack trace", e);
}
close();
}
}


doIO
  说明:该函数主要是进行IO处理,当传入的SelectionKey是可读时,其处理流程如下

if (k.isReadable()) { // key可读
// 将内容从socket写入incoming缓冲
int rc = sock.read(incomingBuffer);
if (rc < 0) { // 流结束异常,无法从客户端读取数据
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) { // 缓冲区已经写满
boolean isPayload;
// 读取下个请求
if (incomingBuffer == lenBuffer) { // start of next request
// 翻转缓冲区,可读
incomingBuffer.flip();
// 读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false
isPayload = readLength(k);
// 清除缓冲
incomingBuffer.clear();
} else { // 不等,因为在readLength中根据Len已经重新分配了incomingBuffer
// continuation
isPayload = true;
}
if (isPayload) { // 不为四个字母,为实际内容    // not the case for 4letterword
// 读取内容
readPayload();
}
else { // 四个字母,为四字母的命令
// four letter words take care
// need not do anything else
return;
}
}
}


  说明:首先从socket中将数据读入incomingBuffer中,再判断incomingBuffer是否与lenBuffer相等,若相等,则表示读取的是一个四个字母的命令,否则表示读取的是具体内容的长度,因为在readLength函数会根据socket中内容的长度重新分配incomingBuffer。其中,readLength函数的源码如下 

private boolean readLength(SelectionKey k) throws IOException {
// Read the length, now get the buffer
// 读取position之后的四个字节
int len = lenBuffer.getInt();
if (!initialized && checkFourLetterWord(sk, len)) { // 未初始化并且是四个字母组成的命令
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
if (zkServer == null) {
throw new IOException("ZooKeeperServer not running");
}
// 重新分配len长度的缓冲
incomingBuffer = ByteBuffer.allocate(len);
return true;
}


  说明:首先会读取lenBuffer缓冲的position之后的四个字节,然后判断其是否是四字母的命令或者是长整形(具体内容的长度),之后再根据长度重新分配incomingBuffer大小。

  同时,在调用完readLength后,会知道是否为内容,若为内容,则会调用readPayload函数来读取内容,其源码如下  

private void readPayload() throws IOException, InterruptedException {
// 表示还未读取完socket中内容
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
// 将socket的内容读入缓冲
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) { // 流结束异常,无法从客户端读取数据
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}

// 表示已经读取完了Socket中内容
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
// 接收到packet
packetReceived();
// 翻转缓冲区
incomingBuffer.flip();
if (!initialized) { // 未初始化
// 读取连接请求
readConnectRequest();
} else {
// 读取请求
readRequest();
}
// 清除缓冲
lenBuffer.clear();
// 赋值incomingBuffer,即清除incoming缓冲
incomingBuffer = lenBuffer;
}
}


  说明:首先会将socket中的实际内容写入incomingBuffer中(已经重新分配大小),当读取完成后,则更新接收的包统计信息,之后再根据是否初始化了还确定读取连接请求还是直接请求,最后会清除缓存,并重新让incomingBuffer与lenBuffer相等,表示该读取过程结束。

  而当doIO中的key为可写时,其处理流程如下 

if (k.isWritable()) { // key可写
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
// "outgoingBuffers.size() = " +
// outgoingBuffers.size());
if (outgoingBuffers.size() > 0) {
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
// "sk " + k + " is valid: " +
// k.isValid());

/*
* This is going to reset the buffer position to 0 and the
* limit to the size of the buffer, so that we can fill it
* with data from the non-direct buffers that we need to
* send.
*/
// 分配的直接缓冲
ByteBuffer directBuffer = factory.directBuffer;
// 清除缓冲
directBuffer.clear();

for (ByteBuffer b : outgoingBuffers) { // 遍历
if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空闲长度小于b的剩余空闲长度
/*
* When we call put later, if the directBuffer is to
* small to hold everything, nothing will be copied,
* so we've got to slice the buffer if it's too big.
*/
// 缩小缓冲至directBuffer的大小
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position of
* the source buffers (we'll do that after the send, if
* needed), so we save and reset the position after the
* copy
*/
// 记录b的当前position
int p = b.position();
// 将b写入directBuffer
directBuffer.put(b);
// 设置回b的原来的position
b.position(p);
if (directBuffer.remaining() == 0) { // 已经写满
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
// 翻转缓冲区,可读
directBuffer.flip();

// 将directBuffer的内容写入socket
int sent = sock.write(directBuffer);
ByteBuffer bb;

// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) { // outgoingBuffers中还存在Buffer
// 取队首元素,但并不移出
bb = outgoingBuffers.peek();
if (bb == ServerCnxnFactory.closeConn) { // 关闭连接,抛出异常
throw new CloseRequestException("close requested");
}

// bb还剩余多少元素没有被发送
int left = bb.remaining() - sent;
if (left > 0) { // 存在元素未被发送
/*
* We only partially sent this buffer, so we update
* the position and exit the loop.
*/
// 更新bb的position
bb.position(bb.position() + sent);
break;
}
// 发送包,调用ServerCnxn方法
packetSent();
/* We've sent the whole buffer, so drop the buffer */
// 已经发送完buffer的所有内容,移除buffer
sent -= bb.remaining();
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
// outgoingBuffers.size() = " + outgoingBuffers.size());
}


  说明:其首先会判断outgoingBuffers中是否还有Buffer未发送,然后遍历Buffer,为提供IO效率,借助了directBuffer(64K大小),之后每次以directBuffer的大小(64K)来将缓冲的内容写入socket中发送,直至全部发送完成。

三、总结

  本篇讲解了NIOServerCnxn的处理细节,其主要依托于Java的NIO相关接口来完成IO操作,也谢谢各位园友的观看~ 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: