netty源码深入研究(从客户端入手)第三篇(详解写消息的管道处理流程)
2017-09-12 13:52
525 查看
假如我们的netty已经连接上了服务器,那么怎么写数据呢,如果从来没用过netty的话,该怎么办呢,有人会说百度啊,谷歌啊,其实最直接有效的方法就是看文档,不管做什么开发,都要熟读人家的文档,老是百度得来的答案,虽然快速,但是对自己后期成长不利。
netty的引导文档
从文档中找到
look最终又进了我们的队列,从tail开始分发输入消息,tail是上一篇所讲的,放在管子最后的管道
其中newPromise()创建一个new DefaultChannelPromise(channel(), executor())
再次看一下findContextOutbound的方法
写方法搜寻管道
读方法搜寻管道
和读数据的顺序完全相反,是从队尾开始读取管道,如果管道是写管道就返回该管道,即编码器必须继承ChannelOutboundHandlerAdapter这个父类。
继续走总路线, EventExecutor executor = next.executor()返回的就是NioEventLoop的类,第一篇第二篇都有介绍,executor.inEventLoop()判断当前线程是不是任务线程,注册启动的时候会启动任务线程,如果是直接发消息,如果不是则加入队列。
接下来我们直接跟进任务,执行逻辑肯定会在实现Runnable接口中实现,即WriteAndFlushTask类中实现,找到run方法
接着进入write(ctx, msg, promise);
look最终的写还是调用了我们的管道封装类去write,重新进入AbstractChannelHandlerContext
这个方法真正实现了管道的写方法,如自定义一个编码器:
不知道看客吗明白没有,好详细的介绍一下invokeHandler()
handlerState是什么鬼,初始值是多少,看样子是枚举啊,首先我找到了
那么第一次handler()这个判断就会返回false,而导致程序走 invokeWrite0(msg, promise);即走客户端自己调用的编码器,然后将信息再写回父类,继又开始重新调用
由于管道的头为HeadContext,并且
覆盖了父类的方法,最后又让NioSocketChannelUnsafe去真正的写入数据
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
(msg, promise); }
其中filterOutboundMessage方法判断数据是否合法
然后判断消息实体是有效的,即长度大于0,最后将消息加入outboundBuffer.addMessage(msg,size, promise);,ChannelOutboundBuffer维护了一个消息队列:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false);
}
既然消息先被保存到队列中,那么最后执行写操作的一定是flush了,跟进
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
//真正实现写方法
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
进入注释的那个方法doWrite
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
循环获取当前的消息,为空退出循环,不为空开始写消息
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
最后调用这个方法,将消息通过socket写入给服务器,到此写流程就完结了。
netty的引导文档
从文档中找到
Channel ch = ...; ch.writeAndFlush(message);这些方法,从中推测,最后写也是通过通道,ok跟进NioSocketChannel里面,写方法没有在NioSocketChannel中实现,而在其父类AbstractChannel中实现
@Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }这里可以看到,最终写方法是管道实现的,那么o了,管道默认为DefaultChannelPipeline进入其方法
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return tail.writeAndFlush(msg, promise); }
look最终又进了我们的队列,从tail开始分发输入消息,tail是上一篇所讲的,放在管子最后的管道
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); //head在前 head.next = tail; //tail在尾 tail.prev = head; }在tail的父类AbstractChannelHandlerContext(既管道的包装类)找到writeAndFlusht的实现方法
@Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
其中newPromise()创建一个new DefaultChannelPromise(channel(), executor())
@Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { //判断输入的消息不为空 if (msg == null) { throw new NullPointerException("msg"); } //检查ChannelPromise是否执行完或者取消 if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } //开始写消息 write(msg, true, promise); return promise; }跟着源码接着进入写方法,我们只关心怎么写的,其他代码暂放一边
private void write(Object msg, boolean flush, ChannelPromise promise) { //向读消息一样,找到匹配的管道 AbstractChannelHandlerContext next = findContextOutbound(); //是否包装msg final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //管道开始刷新缓冲区写 next.invokeWriteAndFlush(m, promise); } else { //写入缓冲区 next.invokeWrite(m, promise); } } else { //加入写队列写消息 AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
再次看一下findContextOutbound的方法
写方法搜寻管道
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
读方法搜寻管道
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
和读数据的顺序完全相反,是从队尾开始读取管道,如果管道是写管道就返回该管道,即编码器必须继承ChannelOutboundHandlerAdapter这个父类。
继续走总路线, EventExecutor executor = next.executor()返回的就是NioEventLoop的类,第一篇第二篇都有介绍,executor.inEventLoop()判断当前线程是不是任务线程,注册启动的时候会启动任务线程,如果是直接发消息,如果不是则加入队列。
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }将写任务加入队列,任务线程一直在读取队列执行任务。
接下来我们直接跟进任务,执行逻辑肯定会在实现Runnable接口中实现,即WriteAndFlushTask类中实现,找到run方法
public final void run() { try { //从NioSocketChannelUnsafe中获取 ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); // Check for null as it may be set to null if the c 4000 hannel is closed already //记录消息的大小 if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) { buffer.decrementPendingOutboundBytes(size); } //在子线程中写 write(ctx, msg, promise); } finally { // Set to null so the GC can collect them directly ctx = null; msg = null; promise = null; handle.recycle(this); } }
接着进入write(ctx, msg, promise);
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.invokeWrite(msg, promise); }
look最终的写还是调用了我们的管道封装类去write,重新进入AbstractChannelHandlerContext
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }和上一篇的读取一样,先判断是否进入管道方法执行,如果是进入管道执行,如果不是,那么继续下一个管道写入处理,直到没有满足的管道为止。
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
这个方法真正实现了管道的写方法,如自定义一个编码器:
public class Encoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof ByteBuf) { ByteBuf bytebuffer = (ByteBuf) msg; byte[] data = bytebuffer.array(); ctx.writeAndFlush(Unpooled.copiedBuffer(byteMerger(data.length, data))); } }我们在这个方法里又调用了ctx.writeAndFlush(Unpooled.copiedBuffer(byteMerger(data.length, data)));,那么现在刚好形成一个队列管道的小循环。
不知道看客吗明白没有,好详细的介绍一下invokeHandler()
private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
handlerState是什么鬼,初始值是多少,看样子是枚举啊,首先我找到了
private volatile int handlerState = INIT;so,一个初始值有木有,INIT=0,
那么第一次handler()这个判断就会返回false,而导致程序走 invokeWrite0(msg, promise);即走客户端自己调用的编码器,然后将信息再写回父类,继又开始重新调用
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { //判断输入的消息不为空 if (msg == null) { throw new NullPointerException("msg"); } //检查ChannelPromise是否执行完或者取消 if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } //开始写消息 write(msg, true, promise); return promise; }这个方法,这就形成了一个微循环,首先我将消息发送给netty框架,然后netty去找我设置的编码器,把我发送的数据换成netty控制器要传的数据,即字节,最终通过socket通道将消息发送给服务端。
由于管道的头为HeadContext,并且
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }代码outbound属性为true,所以最终会调用HeadContext,并且它实现了setAddComplete方法,所以中的数据是由HeadContext类负责写到通道中,继续跟到代码
覆盖了父类的方法,最后又让NioSocketChannelUnsafe去真正的写入数据
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
(msg, promise); }
其中filterOutboundMessage方法判断数据是否合法
protected final Object filterOutboundMessage(Object msg) { //发送的数据最终必须被转化为ByteBuf或FileRegion,否则抛异常 if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
然后判断消息实体是有效的,即长度大于0,最后将消息加入outboundBuffer.addMessage(msg,size, promise);,ChannelOutboundBuffer维护了一个消息队列:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false);
}
既然消息先被保存到队列中,那么最后执行写操作的一定是flush了,跟进
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
//真正实现写方法
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
进入注释的那个方法doWrite
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
循环获取当前的消息,为空退出循环,不为空开始写消息
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
最后调用这个方法,将消息通过socket写入给服务器,到此写流程就完结了。
相关文章推荐
- netty源码深入研究(从客户端入手)第二篇(详解读消息的管道处理流程)
- netty源码深入研究(从客户端入手)第四篇(读写超时详解)
- MQTT---HiveMQ源码详解(十二)Netty-MQTT消息、事件处理(流程)
- MQTT---HiveMQ源码详解(十三)Netty-MQTT消息、事件处理(源码举例解读)
- (三)Netty源码学习笔记之boss线程处理流程
- Android异步消息处理机制详解及源码分析
- 第二人生的源码分析(三十七)消息处理的完整流程
- Android异步消息处理机制详解及源码分析
- 蔡军生先生第二人生的源码分析(三十七)消息处理的完整流程
- 第二人生的源码分析(三十七)消息处理的完整流程
- 【SSH进阶之路】深入源码,详解Struts基本实现流程(七)
- Android异步消息处理机制详解及源码分析
- 第二人生的源码分析(三十七)消息处理的完整流程
- (三)Netty源码学习笔记之boss线程处理流程
- Android异步消息处理机制详解及源码分析
- hadoop源码解析之hdfs写数据全流程分析---客户端处理
- 《Android进阶》之第三篇 深入理解android的消息处理机制
- Volley源码阅读详解(一)---网络任务分发,处理和交付的核心流程
- Netty源码– Netty服务器处理流程分析
- 深入研究Netty框架之ByteBuf功能原理及源码分析