【Netty源码分析】发送数据过程
2016-11-23 11:22
477 查看
前面两篇博客【Netty源码分析】Netty服务端bind端口过程和【Netty源码分析】客户端connect服务端过程中我们分别介绍了服务端绑定端口和客户端连接到服务端的过程,接下来我们分析一下数据发送的过程。
调用AbstractChannelHandlerContext的writeAndFlush函数
首先是调用write函数,将数据写到buffer中。
AbstractUnsafe中调用flush过程,在这里我们可以看到之前写入数据的buffer(outboundBuffer)
调用NioSocketChannel中的doWrite函数,在doWrite函数中会看到调用NIO中的socketChannel中的写数据操作。
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");调用AbstractChannel的writeAndFlush函数
@Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
@Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
调用AbstractChannelHandlerContext的writeAndFlush函数
@Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
@Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { ........ write(msg, true, promise); ....... }需要注意的一点是,写数据的过程其实是分为两步的,第一步是将要写的数据写到buffer中,第二步是flush其实就是从buffer中读取数据然后发送给服务端。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
首先是调用write函数,将数据写到buffer中。
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }调用HeadContext的write函数
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }AbstractUnsafe中调用write函数,这一步就可以认为将数据写到buffer中了,接下来buffer的东西我们会分析。
@Override public final void write(Object msg, ChannelPromise promise) { ....... outboundBuffer.addMessage(msg, size, promise); ...... }接下来是flush过程,将数据写到服务端
private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }HeadContext中调用flush过程
@Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
AbstractUnsafe中调用flush过程,在这里我们可以看到之前写入数据的buffer(outboundBuffer)
@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }调用AbstractNioUnsafe的flush0函数
@Override[code]protected void flush0() { ........ doWrite(outboundBuffer); ....... }AbstractUnsafe中调用flush0函数
protected void flush0() { ........ doWrite(outboundBuffer); ....... }
调用NioSocketChannel中的doWrite函数,在doWrite函数中会看到调用NIO中的socketChannel中的写数据操作。
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }
相关文章推荐
- 【Netty源码分析】发送数据过程
- 【Netty源码分析】发送数据过程
- 【Netty源码分析】数据读取过程
- 【Netty源码分析】数据读取过程
- 【Netty源码分析】数据读取过程
- 第二人生的源码分析(二十八)UDP发送数据的可靠性控制
- Netty 源码分析(三):服务器端的初始化和注册过程
- Android IPC数据在内核空间中的发送过程分析
- 10.Spark Streaming源码分析:Receiver数据接收全过程详解
- netty3.2.3源码分析--服务器端发送数据分析
- spark-parquet列存储之:数据写入过程源码分析
- Launcher3源码分析 — 数据加载过程
- 第二人生的源码分析(二十八)UDP发送数据的可靠性控制
- 源码分析netty服务器创建过程vs java nio服务器创建
- 10.Spark Streaming源码分析:Receiver数据接收全过程详解
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- Linux内核网络源码分析——发送数据
- 城市公交数据下载(续)分析过程及源码(支持全国440个城市)
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一