netty学习之二 分包、组包、粘包处理
2017-05-20 00:00
411 查看
摘要: 在数据传输中,我们发送的数据包会遇到拆包粘包的问题,那么使用netty的时候如何处理呢
在数据传输中,我们发送的数据包如下所示
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
而实际接收的包的格式为:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
产生的原因为:数据在传输过程中,产生数据包碎片(TCP/IP数据传输时大数据包无法一次传输,被拆分成小数据包,小数据包即为数据包碎片),这就造成了实际接收的数据包和发送的数据包不一致的情况。
那么一般情况下我们是如何解决这种问题的呢?我所知道的有这几种方案:
消息定长
在包尾增加一个标识,通过这个标志符进行分割
将消息分为两部分,也就是消息头和消息尾,消息头中写入要发送数据的总长度,通常是在消息头的第一个字段使用int值来标识发送数据的长度。
客户端代码在创建handler的时候也要指定长度大小,并且与服务器端指定大小一致即可
运行结果如下:
相同的在客户端需要同样指定客户端分割符decoder,并向服务端发送消息如下
下面介绍第三种处理方案,也是很多分布式框架中使用的方式,
Netty4本身自带了ObjectDecoder,ObjectEncoder来实现自定义对象的序列 化,
但是用的是java内置的序列化,由于java序列化的性能并不是很好,
所以很多时候我们需要用其他序列化方式,常见的有 Kryo,Jackson,fastjson,protobuf等。这里要写的其实用什么序列化不是重点,而是我们怎么设计我们的Decoder和 Encoder。
首先我们写一个Encoder,我们继承自MessageToByteEncoder<T> ,把对象转换成byte,继承这个对象,会要求我们实现一个encode方法:
那么当我们在Decode的时候,该怎么处理发送过来的数据呢?这里我们继承ByteToMessageDecoder方法,继承这个对象,会要求我们实现一个decode方法
完整代码链接:https://github.com/winstonelei/Smt
在数据传输中,我们发送的数据包如下所示
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
而实际接收的包的格式为:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
产生的原因为:数据在传输过程中,产生数据包碎片(TCP/IP数据传输时大数据包无法一次传输,被拆分成小数据包,小数据包即为数据包碎片),这就造成了实际接收的数据包和发送的数据包不一致的情况。
那么一般情况下我们是如何解决这种问题的呢?我所知道的有这几种方案:
消息定长
在包尾增加一个标识,通过这个标志符进行分割
将消息分为两部分,也就是消息头和消息尾,消息头中写入要发送数据的总长度,通常是在消息头的第一个字段使用int值来标识发送数据的长度。
首先看netty中消息定长如何处理的,首先在服务端设定当前服务端接受大小为固定长度,sc.pipeline().addLast(new FixedLengthFrameDecoder(5));本例中指定长度为5个字符串大小,此类是netty框架提供。
ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置定长字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } });
客户端代码在创建handler的时候也要指定长度大小,并且与服务器端指定大小一致即可
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5));//制定传输数据大小 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccccc".getBytes()));
运行结果如下:
尝试了第一种方案后,下面尝试下第二种方案,这种定长的方式直接限制了传输信息的大小,而且要服务端和客户端同时指定大小感觉并不是太好,下面看下指定分隔符是如何处理的呢,首先服务端指定分隔符ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes());然后创建new DelimiterBasedFrameDecoder(1024, buf)分割符指定Decoder
ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置特殊分隔符 ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } });
相同的在客户端需要同样指定客户端分割符decoder,并向服务端发送消息如下
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbasfab*_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("ccsdfasfcc*_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("fffff*_".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync();
下面介绍第三种处理方案,也是很多分布式框架中使用的方式,
Netty4本身自带了ObjectDecoder,ObjectEncoder来实现自定义对象的序列 化,
但是用的是java内置的序列化,由于java序列化的性能并不是很好,
所以很多时候我们需要用其他序列化方式,常见的有 Kryo,Jackson,fastjson,protobuf等。这里要写的其实用什么序列化不是重点,而是我们怎么设计我们的Decoder和 Encoder。
首先我们写一个Encoder,我们继承自MessageToByteEncoder<T> ,把对象转换成byte,继承这个对象,会要求我们实现一个encode方法:
@Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { byte[] body = convertToBytes(msg); //将对象转换为byte,伪代码,具体用什么进行序列化,你们自行选择。可以使用我上面说的一些 int dataLength = body.length; //读取消息的长度 out.writeInt(dataLength); //先将消息长度写入,也就是消息头 out.writeBytes(body); //消息体中包含我们要发送的数据 }
那么当我们在Decode的时候,该怎么处理发送过来的数据呢?这里我们继承ByteToMessageDecoder方法,继承这个对象,会要求我们实现一个decode方法
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < HEAD_LENGTH) { //这个HEAD_LENGTH是我们用于表示头长度的字节数。 由于上面我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4. return; } in.markReaderIndex(); //我们标记一下当前的readIndex的位置 int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4 if (dataLength < 0) { // 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。 ctx.close(); } if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; // 嗯,这时候,我们读到的长度,满足我们的要求了,把传送过来的数据,取出来吧~~ in.readBytes(body); // Object o = convertToObject(body); //将byte数据转化为我们需要的对象。伪代码,用什么序列化,自行选择 out.add(o); }
完整代码链接:https://github.com/winstonelei/Smt
相关文章推荐
- netty 数据分包、组包、粘包处理机制(一)
- netty之粘包分包的处理
- netty 数据分包、组包、粘包处理
- netty 数据分包、组包、粘包处理机制(二)
- netty 数据分包、组包、粘包处理机制(一)
- netty 数据分包、组包、粘包处理机制
- netty 数据分包、组包、粘包处理机制(二)
- 实例:Netty 处理 TCP协议数据分包问题
- Linux IGMP SNOOPING 学习笔记 之二 igmp数据包处理
- Netty源码学习-ServerBootstrap启动及事件处理过程
- mina学习 粘包,断包处理
- Netty(三)TCP粘包拆包处理
- [VB学习中]之二:事件及事件处理
- netty入门学习(5)-超时处理
- socket的半包,粘包与分包的问题和处理代码
- netty处理粘包问题——2
- netty处理粘包问题用特殊字符分割——3
- netty学习之二:ByteBuf解读
- Linux icmp 学习笔记 之二 icmp数据处理流程
- TCP分包方法 && 粘包处理策略