【Zookeeper】源码分析之网络通信(二)之NIOServerCnxn
2017-03-01 17:22
429 查看
一、前言
前面介绍了ServerCnxn,下面开始学习NIOServerCnxn。
二、NIOServerCnxn源码分析
2.1 类的继承关系
说明:NIOServerCnxn继承了ServerCnxn抽象类,使用NIO来处理与客户端之间的通信,使用单线程处理。
2.2 类的内部类
1. SendBufferWriter类
doIO
说明:该函数主要是进行IO处理,当传入的SelectionKey是可读时,其处理流程如下
说明:首先从socket中将数据读入incomingBuffer中,再判断incomingBuffer是否与lenBuffer相等,若相等,则表示读取的是一个四个字母的命令,否则表示读取的是具体内容的长度,因为在readLength函数会根据socket中内容的长度重新分配incomingBuffer。其中,readLength函数的源码如下
说明:首先会读取lenBuffer缓冲的position之后的四个字节,然后判断其是否是四字母的命令或者是长整形(具体内容的长度),之后再根据长度重新分配incomingBuffer大小。
同时,在调用完readLength后,会知道是否为内容,若为内容,则会调用readPayload函数来读取内容,其源码如下
说明:首先会将socket中的实际内容写入incomingBuffer中(已经重新分配大小),当读取完成后,则更新接收的包统计信息,之后再根据是否初始化了还确定读取连接请求还是直接请求,最后会清除缓存,并重新让incomingBuffer与lenBuffer相等,表示该读取过程结束。
而当doIO中的key为可写时,其处理流程如下
说明:其首先会判断outgoingBuffers中是否还有Buffer未发送,然后遍历Buffer,为提供IO效率,借助了directBuffer(64K大小),之后每次以directBuffer的大小(64K)来将缓冲的内容写入socket中发送,直至全部发送完成。
三、总结
本篇讲解了NIOServerCnxn的处理细节,其主要依托于Java的NIO相关接口来完成IO操作,也谢谢各位园友的观看~
前面介绍了ServerCnxn,下面开始学习NIOServerCnxn。
二、NIOServerCnxn源码分析
2.1 类的继承关系
public class NIOServerCnxn extends ServerCnxn {}
说明:NIOServerCnxn继承了ServerCnxn抽象类,使用NIO来处理与客户端之间的通信,使用单线程处理。
2.2 类的内部类
1. SendBufferWriter类
void doIO(SelectionKey k) throws InterruptedException { try { if (isSocketOpen() == false) { // socket未开启 LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); return; } if (k.isReadable()) { // key可读 // 将内容从socket写入incoming缓冲 int rc = sock.read(incomingBuffer); if (rc < 0) { // 流结束异常,无法从客户端读取数据 throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { // 缓冲区已经写满 boolean isPayload; // 读取下个请求 if (incomingBuffer == lenBuffer) { // start of next request // 翻转缓冲区,可读 incomingBuffer.flip(); // 读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false isPayload = readLength(k); // 清除缓冲 incomingBuffer.clear(); } else { // 不等,因为在readLength中根据Len已经重新分配了incomingBuffer // continuation isPayload = true; } if (isPayload) { // 不为四个字母,为实际内容 // not the case for 4letterword // 读取内容 readPayload(); } else { // 四个字母,为四字母的命令 // four letter words take care // need not do anything else return; } } } if (k.isWritable()) { // key可写 // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK // "outgoingBuffers.size() = " + // outgoingBuffers.size()); if (outgoingBuffers.size() > 0) { // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, // "sk " + k + " is valid: " + // k.isValid()); /* * This is going to reset the buffer position to 0 and the * limit to the size of the buffer, so that we can fill it * with data from the non-direct buffers that we need to * send. */ // 分配的直接缓冲 ByteBuffer directBuffer = factory.directBuffer; // 清除缓冲 directBuffer.clear(); for (ByteBuffer b : outgoingBuffers) { // 遍历 if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空闲长度小于b的剩余空闲长度 /* * When we call put later, if the directBuffer is to * small to hold everything, nothing will be copied, * so we've got to slice the buffer if it's too big. */ // 缩小缓冲至directBuffer的大小 b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } /* * put() is going to modify the positions of both * buffers, put we don't want to change the position of * the source buffers (we'll do that after the send, if * needed), so we save and reset the position after the * copy */ // 记录b的当前position int p = b.position(); // 将b写入directBuffer directBuffer.put(b); // 设置回b的原来的position b.position(p); if (directBuffer.remaining() == 0) { // 已经写满 break; } } /* * Do the flip: limit becomes position, position gets set to * 0. This sets us up for the write. */ // 翻转缓冲区,可读 directBuffer.flip(); // 将directBuffer的内容写入socket int sent = sock.write(directBuffer); ByteBuffer bb; // Remove the buffers that we have sent while (outgoingBuffers.size() > 0) { // outgoingBuffers中还存在Buffer // 取队首元素,但并不移出 bb = outgoingBuffers.peek(); if (bb == ServerCnxnFactory.closeConn) { // 关闭连接,抛出异常 throw new CloseRequestException("close requested"); } // bb还剩余多少元素没有被发送 int left = bb.remaining() - sent; if (left > 0) { // 存在元素未被发送 /* * We only partially sent this buffer, so we update * the position and exit the loop. */ // 更新bb的position bb.position(bb.position() + sent); break; } // 发送包,调用ServerCnxn方法 packetSent(); /* We've sent the whole buffer, so drop the buffer */ // 已经发送完buffer的所有内容,移除buffer sent -= bb.remaining(); outgoingBuffers.remove(); } // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send, // outgoingBuffers.size() = " + outgoingBuffers.size()); } synchronized(this.factory){ // 同步块 if (outgoingBuffers.size() == 0) { // outgoingBuffers不存在buffer if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) { // 未初始化并且无读请求 throw new CloseRequestException("responded to info probe"); } // 重置感兴趣的集合 sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); } else { // 重置感兴趣的集合 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } } catch (CancelledKeyException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + " due to " + e); if (LOG.isDebugEnabled()) { LOG.debug("CancelledKeyException stack trace", e); } close(); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn("caught end of stream exception",e); // tell user why // expecting close to log session closure close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + " due to " + e); if (LOG.isDebugEnabled()) { LOG.debug("IOException stack trace", e); } close(); } }
doIO
说明:该函数主要是进行IO处理,当传入的SelectionKey是可读时,其处理流程如下
if (k.isReadable()) { // key可读 // 将内容从socket写入incoming缓冲 int rc = sock.read(incomingBuffer); if (rc < 0) { // 流结束异常,无法从客户端读取数据 throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { // 缓冲区已经写满 boolean isPayload; // 读取下个请求 if (incomingBuffer == lenBuffer) { // start of next request // 翻转缓冲区,可读 incomingBuffer.flip(); // 读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false isPayload = readLength(k); // 清除缓冲 incomingBuffer.clear(); } else { // 不等,因为在readLength中根据Len已经重新分配了incomingBuffer // continuation isPayload = true; } if (isPayload) { // 不为四个字母,为实际内容 // not the case for 4letterword // 读取内容 readPayload(); } else { // 四个字母,为四字母的命令 // four letter words take care // need not do anything else return; } } }
说明:首先从socket中将数据读入incomingBuffer中,再判断incomingBuffer是否与lenBuffer相等,若相等,则表示读取的是一个四个字母的命令,否则表示读取的是具体内容的长度,因为在readLength函数会根据socket中内容的长度重新分配incomingBuffer。其中,readLength函数的源码如下
private boolean readLength(SelectionKey k) throws IOException { // Read the length, now get the buffer // 读取position之后的四个字节 int len = lenBuffer.getInt(); if (!initialized && checkFourLetterWord(sk, len)) { // 未初始化并且是四个字母组成的命令 return false; } if (len < 0 || len > BinaryInputArchive.maxBuffer) { throw new IOException("Len error " + len); } if (zkServer == null) { throw new IOException("ZooKeeperServer not running"); } // 重新分配len长度的缓冲 incomingBuffer = ByteBuffer.allocate(len); return true; }
说明:首先会读取lenBuffer缓冲的position之后的四个字节,然后判断其是否是四字母的命令或者是长整形(具体内容的长度),之后再根据长度重新分配incomingBuffer大小。
同时,在调用完readLength后,会知道是否为内容,若为内容,则会调用readPayload函数来读取内容,其源码如下
private void readPayload() throws IOException, InterruptedException { // 表示还未读取完socket中内容 if (incomingBuffer.remaining() != 0) { // have we read length bytes? // 将socket的内容读入缓冲 int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { // 流结束异常,无法从客户端读取数据 throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } // 表示已经读取完了Socket中内容 if (incomingBuffer.remaining() == 0) { // have we read length bytes? // 接收到packet packetReceived(); // 翻转缓冲区 incomingBuffer.flip(); if (!initialized) { // 未初始化 // 读取连接请求 readConnectRequest(); } else { // 读取请求 readRequest(); } // 清除缓冲 lenBuffer.clear(); // 赋值incomingBuffer,即清除incoming缓冲 incomingBuffer = lenBuffer; } }
说明:首先会将socket中的实际内容写入incomingBuffer中(已经重新分配大小),当读取完成后,则更新接收的包统计信息,之后再根据是否初始化了还确定读取连接请求还是直接请求,最后会清除缓存,并重新让incomingBuffer与lenBuffer相等,表示该读取过程结束。
而当doIO中的key为可写时,其处理流程如下
if (k.isWritable()) { // key可写 // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK // "outgoingBuffers.size() = " + // outgoingBuffers.size()); if (outgoingBuffers.size() > 0) { // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, // "sk " + k + " is valid: " + // k.isValid()); /* * This is going to reset the buffer position to 0 and the * limit to the size of the buffer, so that we can fill it * with data from the non-direct buffers that we need to * send. */ // 分配的直接缓冲 ByteBuffer directBuffer = factory.directBuffer; // 清除缓冲 directBuffer.clear(); for (ByteBuffer b : outgoingBuffers) { // 遍历 if (directBuffer.remaining() < b.remaining()) { // directBuffer的剩余空闲长度小于b的剩余空闲长度 /* * When we call put later, if the directBuffer is to * small to hold everything, nothing will be copied, * so we've got to slice the buffer if it's too big. */ // 缩小缓冲至directBuffer的大小 b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } /* * put() is going to modify the positions of both * buffers, put we don't want to change the position of * the source buffers (we'll do that after the send, if * needed), so we save and reset the position after the * copy */ // 记录b的当前position int p = b.position(); // 将b写入directBuffer directBuffer.put(b); // 设置回b的原来的position b.position(p); if (directBuffer.remaining() == 0) { // 已经写满 break; } } /* * Do the flip: limit becomes position, position gets set to * 0. This sets us up for the write. */ // 翻转缓冲区,可读 directBuffer.flip(); // 将directBuffer的内容写入socket int sent = sock.write(directBuffer); ByteBuffer bb; // Remove the buffers that we have sent while (outgoingBuffers.size() > 0) { // outgoingBuffers中还存在Buffer // 取队首元素,但并不移出 bb = outgoingBuffers.peek(); if (bb == ServerCnxnFactory.closeConn) { // 关闭连接,抛出异常 throw new CloseRequestException("close requested"); } // bb还剩余多少元素没有被发送 int left = bb.remaining() - sent; if (left > 0) { // 存在元素未被发送 /* * We only partially sent this buffer, so we update * the position and exit the loop. */ // 更新bb的position bb.position(bb.position() + sent); break; } // 发送包,调用ServerCnxn方法 packetSent(); /* We've sent the whole buffer, so drop the buffer */ // 已经发送完buffer的所有内容,移除buffer sent -= bb.remaining(); outgoingBuffers.remove(); } // ZooLog.logTraceMessage(LOG, // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send, // outgoingBuffers.size() = " + outgoingBuffers.size()); }
说明:其首先会判断outgoingBuffers中是否还有Buffer未发送,然后遍历Buffer,为提供IO效率,借助了directBuffer(64K大小),之后每次以directBuffer的大小(64K)来将缓冲的内容写入socket中发送,直至全部发送完成。
三、总结
本篇讲解了NIOServerCnxn的处理细节,其主要依托于Java的NIO相关接口来完成IO操作,也谢谢各位园友的观看~
相关文章推荐
- 【Zookeeper】源码分析之网络通信(三)之NettyServerCnxn
- 【Zookeeper】源码分析之网络通信(三)
- 【Zookeeper】源码分析之网络通信(三)
- 【Zookeeper】源码分析之网络通信(二)
- 【Zookeeper】源码分析之网络通信(一)
- 【Zookeeper】源码分析之网络通信(一)
- 【Zookeeper】源码分析之网络通信(一)
- apache kafka系列之源码分析走读-server端网络架构分析
- 【Zookeeper】源码分析之服务器(三)之LeaderZooKeeperServer
- android网络编程 -- HTTP通信(02) Android HTTP 通信 [附源码分析]
- 基于TCP网络通信的自动升级程序源码分析-启动升级文件下载程序
- 优秀的轻量级网络开发框架spserver源码分析(一)
- 基于TCP网络通信的自动升级程序源码分析-客户端连接服务器
- 【Zookeeper】源码分析之服务器(二)之ZooKeeperServer
- 【Zookeeper】源码分析之服务器(四)之FollowerZooKeeperServer
- apache kafka系列之源码分析走读-server端网络架构分析
- 基于TCP网络通信的自动升级程序源码分析-客户端接收文件
- redis anet网络通信的源码分析
- Zeroc ICE 源码分析三 ICE的网络通信
- DotNetty网络通信框架学习之源码分析