您的位置:首页 > 理论基础 > 计算机网络

Netty3.10.1:关于TCP粘包问题 及 Encoder&Decoder

2015-03-29 00:00 337 查看
上一篇文章,我们只是解释了,在Netty中,从调用socket的read函数,到复制读取的内容到自己的缓冲区里,

到把这个缓冲区封装成一个UpStream事件,到分发给SocketChannel对应的PipeLine,到如何被自定义的类的MessageReceived函数处理

的整个过程。

---------------但是,很多时候,由于TCP的天生字节流特性,导致我们需要解决粘包的问题,其实也就是说

可能某个MessageReceived中的内容不是一个逻辑上完整的内容,想象一下,如果你使用XML格式来定义消息

结果本次读取的内容不是一个完整的XML,因为剩下的内容可能要在下一次MessageReceived中处理,

此时,怎么办?

MessageDecoder就是用来处理这种情况的。

----------------------------------------------那么,在整个系统中,MessageDecoder到底是如何处理的呢?

这就得分析

p.addLast("name1", new Handler1);

p.addLast("name2", new Handler2);

p.addLast("name3", new Handler3);

道理意味着什么?

其实,上面这3行代码是放在PipelineFactory的getPipeline()函数里的。

然后执行了这3行代码,就形成了一条处理链,从1到3顺序双链表连接。

1<--->2<--->3.

head为1,tail为3.这样的话,上行消息就从1到2到3处理,下行消息就从3到2到1处理。

每当到达一节节点是,根据上行还是下行,判断当前节点是否可以处理本消息而决定是处理还是跳过。

----------------------------------------------------------------------------------------------------------

由于MessageReceived是一个上行消息,在真正被处理前,应该被解码,所以如果hander2是处理完整消息的节点的话。

则handler1则应该是MessageDecoder节点,且这个节点应该可以执行MessageReceived函数。也就是起码应该

extends SimpleChannelUpstreamHandler ,也可以是SimpleChannelHandler 。

反正只要你实现了MessageReceived就可以了。

----------------------------------------------------------------------------------------------------------

public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
}

return;
}

sendUpstream(head, e);
}

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}

这上面的代码的意思就是:如果某个PipeLine收到一个上行消息,就从head找到第一个可以处理上行消息的节点

交给它处理,下面我们看看上行节点收到消息后怎么办?

public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
writeComplete(ctx, evt);
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
childChannelOpen(ctx, evt);
} else {
childChannelClosed(ctx, evt);
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {
channelOpen(ctx, evt);
} else {
channelClosed(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
channelBound(ctx, evt);
} else {
channelUnbound(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
channelConnected(ctx, evt);
} else {
channelDisconnected(ctx, evt);
}
break;
case INTEREST_OPS:
channelInterestChanged(ctx, evt);
break;
default:
ctx.sendUpstream(e);
}
} else if (e instanceof ExceptionEvent) {
exceptionCaught(ctx, (ExceptionEvent) e);
} else {
ctx.sendUpstream(e);
}
}

从这里,我们已经可以很清楚的知道了,对于每一个上行Event 处理器来说,

只要是MessageEvent,就会调用messageReceived函数。

而这个函数正是我们之前自定义的@Override函数。

------------------------------------------------------那么MessageDecoder是如何嵌入进去的呢?

这里我们以继承了FrameDecoder的类为例子讲解!

@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {

Object m = e.getMessage();//获取消息
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}

ChannelBuffer input = (ChannelBuffer) m;//转换成ChannelBuffer对象
if (!input.readable()) {
return;
}

if (cumulation == null) {
try {//根据需要决定是否需要缓存本次消息,然后再把复制好的消息传给callDecode函数
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
} else {
input = appendToCumulation(input);
try {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
}
}

其实说白了,就是FrameDecoder会在本地缓存数据,然后每次来了新消息之后,就把自己的老数据+新数据合并成目前的数据

然后传给自定义类的decode函数。

如下是一个自定义的Decoder类

public class MyFrameDecoder extends FrameDecoder {

@Override

protected Object decode(ChannelHandlerContext ctx,

channel,

ChannelBuffer buf) throws Exception {

if (buf.readableBytes() < 4) {

return null;

}

buf.markReaderIndex();

int length = buf.readInt();

if (buf.readableBytes() < length) {

buf.resetReaderIndex();

return null;

}

ChannelBuffer frame = buf.readBytes(length);

return frame;

}

}

这个比较简单,就不多说了。

------------------------------------------------------

那么,结合上一篇文章,我们这里就知道如何利用MessageDecoder和MessageHandler来处理一个消息了。

至于编码Encoder,就可以继承OneToOneEncoder即可。

最后写入到对应的socketChannel的队列里。

至于其他心跳检测等内容,可以参考http://peirenlei.iteye.com/blog/1832842

@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msgObj) throws Exception {

ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
Message msg = (Message) msgObj;
// version
byte[] version = new byte[1];
version[0] = msg.getVersion();
buffer.writeBytes(version);
// length
buffer.writeInt(0);// 数据长度
// content
byte[] bytes = null;
String str = msg.getContent();
try {
bytes = str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
logger.error("", e);
}
buffer.writeBytes(bytes);// 数据体
// 修正长度
buffer.setInt(1, buffer.writerIndex());// 重置长度
//ok
logger.debug("send message=====" + str.toString());
return buffer;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty