您的位置:首页 > Web前端 > BootStrap

netty源码深入研究(从客户端入手)第三篇(详解写消息的管道处理流程)

2017-09-12 13:52 525 查看
假如我们的netty已经连接上了服务器,那么怎么写数据呢,如果从来没用过netty的话,该怎么办呢,有人会说百度啊,谷歌啊,其实最直接有效的方法就是看文档,不管做什么开发,都要熟读人家的文档,老是百度得来的答案,虽然快速,但是对自己后期成长不利。

netty的引导文档

从文档中找到

Channel ch = ...;
ch.writeAndFlush(message);
这些方法,从中推测,最后写也是通过通道,ok跟进NioSocketChannel里面,写方法没有在NioSocketChannel中实现,而在其父类AbstractChannel中实现

@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
这里可以看到,最终写方法是管道实现的,那么o了,管道默认为DefaultChannelPipeline进入其方法

public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}


look最终又进了我们的队列,从tail开始分发输入消息,tail是上一篇所讲的,放在管子最后的管道

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise =  new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);
//head在前
head.next = tail;
//tail在尾
tail.prev = head;
}
在tail的父类AbstractChannelHandlerContext(既管道的包装类)找到writeAndFlusht的实现方法

@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}


其中newPromise()创建一个new DefaultChannelPromise(channel(), executor())

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
//判断输入的消息不为空
if (msg == null) {
throw new NullPointerException("msg");
}
//检查ChannelPromise是否执行完或者取消
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
//开始写消息
write(msg, true, promise);

return promise;
}
跟着源码接着进入写方法,我们只关心怎么写的,其他代码暂放一边

private void write(Object msg, boolean flush, ChannelPromise promise) {
//向读消息一样,找到匹配的管道
AbstractChannelHandlerContext next = findContextOutbound();
//是否包装msg
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
//管道开始刷新缓冲区写
next.invokeWriteAndFlush(m, promise);
} else {
//写入缓冲区
next.invokeWrite(m, promise);
}
} else {
//加入写队列写消息
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
}  else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}


再次看一下findContextOutbound的方法

写方法搜寻管道

private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}


读方法搜寻管道

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}


和读数据的顺序完全相反,是从队尾开始读取管道,如果管道是写管道就返回该管道,即编码器必须继承ChannelOutboundHandlerAdapter这个父类。
继续走总路线, EventExecutor executor = next.executor()返回的就是NioEventLoop的类,第一篇第二篇都有介绍,executor.inEventLoop()判断当前线程是不是任务线程,注册启动的时候会启动任务线程,如果是直接发消息,如果不是则加入队列。

public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
将写任务加入队列,任务线程一直在读取队列执行任务。

接下来我们直接跟进任务,执行逻辑肯定会在实现Runnable接口中实现,即WriteAndFlushTask类中实现,找到run方法

public final void run() {
try {
//从NioSocketChannelUnsafe中获取  ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the c
4000
hannel is closed already
//记录消息的大小
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
//在子线程中写
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
handle.recycle(this);
}
}


接着进入write(ctx, msg, promise);

protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}


look最终的写还是调用了我们的管道封装类去write,重新进入AbstractChannelHandlerContext 

private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
和上一篇的读取一样,先判断是否进入管道方法执行,如果是进入管道执行,如果不是,那么继续下一个管道写入处理,直到没有满足的管道为止。

private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}


这个方法真正实现了管道的写方法,如自定义一个编码器:

public class Encoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf bytebuffer = (ByteBuf) msg;
byte[] data = bytebuffer.array();
ctx.writeAndFlush(Unpooled.copiedBuffer(byteMerger(data.length, data)));
}
}
我们在这个方法里又调用了ctx.writeAndFlush(Unpooled.copiedBuffer(byteMerger(data.length, data)));,那么现在刚好形成一个队列管道的小循环。

不知道看客吗明白没有,好详细的介绍一下invokeHandler()

private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}


handlerState是什么鬼,初始值是多少,看样子是枚举啊,首先我找到了

private volatile int handlerState = INIT;
so,一个初始值有木有,INIT=0,

那么第一次handler()这个判断就会返回false,而导致程序走 invokeWrite0(msg, promise);即走客户端自己调用的编码器,然后将信息再写回父类,继又开始重新调用

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
//判断输入的消息不为空
if (msg == null) {
throw new NullPointerException("msg");
}
//检查ChannelPromise是否执行完或者取消
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
//开始写消息
write(msg, true, promise);

return promise;
}
这个方法,这就形成了一个微循环,首先我将消息发送给netty框架,然后netty去找我设置的编码器,把我发送的数据换成netty控制器要传的数据,即字节,最终通过socket通道将消息发送给服务端。

由于管道的头为HeadContext,并且  

HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
代码outbound属性为true,所以最终会调用HeadContext,并且它实现了setAddComplete方法,所以中的数据是由HeadContext类负责写到通道中,继续跟到代码

覆盖了父类的方法,最后又让NioSocketChannelUnsafe去真正的写入数据

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}

int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}

outboundBuffer.addMessage(msg, size, promise);
}

(msg, promise); }


其中filterOutboundMessage方法判断数据是否合法

protected final Object filterOutboundMessage(Object msg) {
//发送的数据最终必须被转化为ByteBuf或FileRegion,否则抛异常
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}

return newDirectBuffer(buf);
}

if (msg instanceof FileRegion) {
return msg;
}

throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

然后判断消息实体是有效的,即长度大于0,最后将消息加入outboundBuffer.addMessage(msg,size, promise);,ChannelOutboundBuffer维护了一个消息队列:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}

// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false);
}


既然消息先被保存到队列中,那么最后执行写操作的一定是flush了,跟进
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}

final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}

inFlush0 = true;

// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}

try {
//真正实现写方法
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}


进入注释的那个方法doWrite
int writeSpinCount = -1;

boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}

if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}

boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}

flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}

in.progress(flushedAmount);

if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();

if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}

for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}

flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}

in.progress(flushedAmount);
}

if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}

循环获取当前的消息,为空退出循环,不为空开始写消息
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}


最后调用这个方法,将消息通过socket写入给服务器,到此写流程就完结了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息