您的位置:首页 > 其它

【Netty源码】read、write与accept源码剖析

2017-08-01 16:41 369 查看

引入

在上篇博文中我讲了NioEventLoop的源码剖析,其中的方法调用如下,

NioEventLoop.run()->selectNow()->processSelectedKeys()->processSelectedKeysOptimized()


最后processSelectedKeysOptimized方法中有三个事件:可读、可写、可接收,我并没有细说,所以在这篇文章中我就详细讲一下。

read

1.图解

boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,由worker线程来监听并处理read事件,



2.触发时机

当work线程的selector检测到OP_READ事件发生时,触发read操作

//NioEventLoop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}


3.read方法源码如下

该read方法定义在类NioByteUnsafe中

//AbstractNioByteChannel.NioByteUnsafe
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
byteBuf = null;
close = localReadAmount < 0;
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

if (to
efcc
talReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}

totalReadAmount += localReadAmount;

// stop reading
if (!config.isAutoRead()) {
break;
}

if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);

pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}


分析:

allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少

allocHandle.allocate(allocator) 申请一块指定大小的内存

通过ByteBufAllocator的ioBuffer方法申请缓存,若支持unsafe,使用直接物理内存,否则使用堆内存

doReadBytes(byteBuf)方法 将socketChannel数据写入缓存

最终底层采用ByteBuffer实现read操作

write

1.引入

把数据返回客户端,需要经历三个步骤:

申请一块缓存buf,写入数据。

将buf保存到ChannelOutboundBuffer中。

将ChannelOutboundBuffer中的buff输出到socketChannel中

2.ctx.write()

//AbstractChannelHandlerContext.java
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, promise);
}  else {
task = WriteTask.newInstance(next, msg, promise);
}
safeExecute(executor, task, promise, msg);
}
}


默认情况下,findContextOutbound()会找到pipeline的head节点,触发write方法

3.write()

//HeadContext.java
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}

//AbstractUnsafe
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}

int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}

outboundBuffer.addMessage(msg, size, promise);
}


分析:

outboundBuffer 随着Unsafe一起实例化,最终将msg通过outboundBuffer封装起来

ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。

unflushedEntry:指向链表头部

tailEntry:指向链表尾部

totalPendingSize:保存msg的字节数

unwritable:不可写标识

新的entry默认插入链表尾部,并让tailEntry指向它

方法incrementPendingOutboundBytes主要采用CAS更新totalPendingSize字段,并判断当前totalPendingSize是否超过阈值writeBufferHighWaterMark,默认是65536。如果totalPendingSize >= 65536,则采用CAS更新unwritable为1,并触发ChannelWritabilityChanged事件。

到此为止,全部的buf数据已经保存在outboundBuffer中

4.ctx.flush()

public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}

return this;
}


分析:

默认情况下,findContextOutbound()会找到pipeline的head节点,触发flush方法

如果当前selectionKey 是写事件,说明有线程执行flush过程,则直接返回。否则直接执行flush操作

如果当前socketChannel已经关闭,或断开连接,则执行失败操作。否则执行doWrite把数据写入到socketChannel。

accept

1.图解

当有客户端connect请求,selector可以返回其对应的SelectionKey,方法processSelectedKeys进行后续的处理。



2.read()方法源码

若processSelectedKey方法的selectionKey发生的事件是SelectionKey.OP_ACCEPT,执行unsafe的read方法。该read方法定义在NioMessageUnsafe类中

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}

final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// stop reading and remove op
if (!config.isAutoRead()) {
break;
}

if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}

readBuf.clear();
pipeline.fireChannelReadComplete();

if (exception != null) {
if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}

pipeline.fireExceptionCaught(exception);
}

if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}


分析:

readBuf 用来保存客户端NioSocketChannel,默认一次不超过16个。

方法doReadMessages进行处理ServerSocketChannel的accept操作。

3.doReadMessages方法处理accept

protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}


分析:

javaChannel()返回NioServerSocketChannel对应的ServerSocketChannel。

ServerSocketChannel.accept返回客户端的socketChannel 。

把 NioServerSocketChannel 和 socketChannel 封装成 NioSocketChannel,并缓存到readBuf。

遍历redBuf中的NioSocketChannel,触发各自pipeline的ChannelRead事件,从pipeline的head开始遍历,最终执行ServerBootstrapAcceptor的channelRead方法。

本人才疏学浅,若有错,请指出

谢谢!

参考链接: Netty 4源码解析:请求处理
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: