netty源码分析之-处理器详解(9)
2017-12-19 21:34
561 查看
Netty处理器重要的概念:
Netty的处理器可以分为两类:入站处理器与出站处理器
入站处理器的顶层是ChannelInboundHandler,出站处理器的顶层是ChannelOutboundHandler
数据处理时常用的各种编码器本质上都是处理器
编解码器:无论我们向网络写入的数据是什么类型(int、char、String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的;将数据由原本的形式转换为字节流的操作称为编码(encode),将数据由字节转换为它原本的格式或是其他格式的操作称为解码(decode),编解码统一成为codec
编码:本质上是一种出站处理器,因此,编码一定是一种ChannelOutboundHandler
解码:本质上是一种入站处理器,因此,解码一定是一种ChannelInboundHandler
在Netty中,编码器通常以XXXEncoder命名,解码器通常以XXXDdcoder命名
在这里由于tcp传输数据的粘包与拆包,接受到的数据不一定是完整的,因此我们需要判断是否能够处理进而转换成我们需要的类型,需要类似以下的判断,实际上后续有ReplayingDecoder能够更加方便的来处理。
例如,对于ByteToMessageDecoder解码可能会有如下操作:
如果第一次读取的header,能够在接下来的读取中全部接受则进行解码处理,否则返回到标记处,也就是无法解码,等待接受更多的数据
而对于ReplayingDecoder的解码,可能会更简洁:
ReplayingDecoder对ByteBuf进行了特殊的实现,当没有足够的数据来解码成特定类型的时候会抛错,在上述方法IntegerHeaderFrameDecoder实现中,当接受到4个或者更多字节的时候能够正常返回整型header,否则将抛错,如果ReplayingDecoder捕获到了异常,将会重新设置readerI
4000
ndex到初始位置,然后重复调用decode来接受更多的字节数据。
ReplayingDecoder如此简洁也会有一些代价,如:
一些ByteBuf的操作可能被禁止
如果网络很差或者消息格式很复杂,那么会导致性能比较差。因为当接受到的字节数据不够会重复的调用多次decode来获取更多的消息;而不会像ByteToMessageDecoder一样直接返回
可能需要注意decode方法会背调用多次为了解码一条消息
当调用第二次values.offer(buf.readInt());的时候可能没有足够多的字节数据,则会重复调用decode,但是此时queue中可能不止2条数据。
正确的做法应该是以下方式:
当数据不够需要重新调用decode的时候,做一次清除queue的操作来保证队列正确的数据
但是我们可以通过调用checkpoint()方法来更新buffer的初始位置,让readerIndex能够重新回到该起始位置。从而避免多次重复的调用decode来对同一个消息多次解码
下面的这些类处理更加复杂的用例:
io.netty.handler.codec.LineBasedFrameDecoder— 这个类在 Netty 内部也有使
用,它使用了行尾控制字符(\n 或者\r\n)来解析消息数据;
io.netty.handler.codec.http.HttpObjectDecoder— 一个 HTTP 数据的解码器。
自定义协议编解码来解决粘包与拆包:
https://github.com/huiGod/netty_lecture/tree/master/src/main/java/com/shengsiyuan/netty/handler3
关于Netty编解码器的重要结论:
无论是编码器还是解码器,其所接受的消息类型必须要与待处理的参数类型一直,否则该编码器或解码器并不会被执行
在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够,否则将会产生一些问题
同样对于编码器和解码器来说,引用计数也需要特别注意,一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放,如果需要保留引用以便稍后使用,那么可以调用referenceCountUtil.retain(message)方法来增加该引用计数,从而防止该消息被释放
Netty的处理器可以分为两类:入站处理器与出站处理器
入站处理器的顶层是ChannelInboundHandler,出站处理器的顶层是ChannelOutboundHandler
数据处理时常用的各种编码器本质上都是处理器
编解码器:无论我们向网络写入的数据是什么类型(int、char、String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的;将数据由原本的形式转换为字节流的操作称为编码(encode),将数据由字节转换为它原本的格式或是其他格式的操作称为解码(decode),编解码统一成为codec
编码:本质上是一种出站处理器,因此,编码一定是一种ChannelOutboundHandler
解码:本质上是一种入站处理器,因此,解码一定是一种ChannelInboundHandler
在Netty中,编码器通常以XXXEncoder命名,解码器通常以XXXDdcoder命名
MessageToByteEncoder
对于编码器的顶层抽象类MessageToByteEncoder,像流式处理一样将一个消息转换成一个ByteBuf,encode是需要实现的重要方法:public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{}
public class IntegerEncoder extends MessageToByteEncoder<Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } }
ByteToMessageDecoder
对于解码器的顶层抽象类ByteToMessageDecoder,像流式处理一样将一个ByteBuf转换成需要的其他对象类型,decode是需要实现的重要方法:public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}
public class SquareDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readBytes(in.readableBytes())); } }
在这里由于tcp传输数据的粘包与拆包,接受到的数据不一定是完整的,因此我们需要判断是否能够处理进而转换成我们需要的类型,需要类似以下的判断,实际上后续有ReplayingDecoder能够更加方便的来处理。
//整形举例 if (in.readableBytes() >= 8) { out.add(in.readLong()); }
MessageToMessageDecoder
将消息从一个类型转换成另一个类型,我们可以在ChannelInitailizer中添加多个编码器或者解码器来同时处理网络中接受到到消息(或者写到网络),类似的编解码器都是类似的功能public class StringToIntegerDecoder extends MessageToMessageDecoder<String> { @Override public void decode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception { out.add(message.length()); } }
ReplayingDecoder
ReplayingDecoder是ByteToMessageDecoder的一个特殊变种,能够在阻塞I/O的范式中实现非阻塞解码。与ByteToMessageDecoder最大的不同是,ReplayingDecoder允许你直接实现decode或者decodeLast就好像所需要的字节数据已经接受到,而不用去监测是否有足够的字节数据能够用来解码例如,对于ByteToMessageDecoder解码可能会有如下操作:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); //header if (buf.readableBytes() < length) { //body buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
如果第一次读取的header,能够在接下来的读取中全部接受则进行解码处理,否则返回到标记处,也就是无法解码,等待接受更多的数据
而对于ReplayingDecoder的解码,可能会更简洁:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
ReplayingDecoder对ByteBuf进行了特殊的实现,当没有足够的数据来解码成特定类型的时候会抛错,在上述方法IntegerHeaderFrameDecoder实现中,当接受到4个或者更多字节的时候能够正常返回整型header,否则将抛错,如果ReplayingDecoder捕获到了异常,将会重新设置readerI
4000
ndex到初始位置,然后重复调用decode来接受更多的字节数据。
ReplayingDecoder如此简洁也会有一些代价,如:
一些ByteBuf的操作可能被禁止
如果网络很差或者消息格式很复杂,那么会导致性能比较差。因为当接受到的字节数据不够会重复的调用多次decode来获取更多的消息;而不会像ByteToMessageDecoder一样直接返回
可能需要注意decode方法会背调用多次为了解码一条消息
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; out.add(values.poll() + values.poll()); } }
当调用第二次values.offer(buf.readInt());的时候可能没有足够多的字节数据,则会重复调用decode,但是此时queue中可能不止2条数据。
正确的做法应该是以下方式:
public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(.., ByteBuf buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
当数据不够需要重新调用decode的时候,做一次清除queue的操作来保证队列正确的数据
但是我们可以通过调用checkpoint()方法来更新buffer的初始位置,让readerIndex能够重新回到该起始位置。从而避免多次重复的调用decode来对同一个消息多次解码
下面的这些类处理更加复杂的用例:
io.netty.handler.codec.LineBasedFrameDecoder— 这个类在 Netty 内部也有使
用,它使用了行尾控制字符(\n 或者\r\n)来解析消息数据;
io.netty.handler.codec.http.HttpObjectDecoder— 一个 HTTP 数据的解码器。
自定义协议编解码来解决粘包与拆包:
https://github.com/huiGod/netty_lecture/tree/master/src/main/java/com/shengsiyuan/netty/handler3
关于Netty编解码器的重要结论:
无论是编码器还是解码器,其所接受的消息类型必须要与待处理的参数类型一直,否则该编码器或解码器并不会被执行
在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够,否则将会产生一些问题
同样对于编码器和解码器来说,引用计数也需要特别注意,一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放,如果需要保留引用以便稍后使用,那么可以调用referenceCountUtil.retain(message)方法来增加该引用计数,从而防止该消息被释放
相关文章推荐
- netty源码分析(二十二)Netty编解码器剖析与入站出站处理器详解
- Netty学习:ChannelHandler执行顺序详解,附源码分析
- netty源码分析之-Channel、ChannelPipeline、ChannelHandler以及 ChannelHandlerContext 详解(2)
- netty源码分析之-引导详解(4)
- netty源码分析之-ByteBuf详解(8)
- netty源码分析之-SimpleChannelInboundHandler与ChannelInboundHandlerAdapter详解(6)
- Android触摸屏事件派发机制详解与源码分析一(View篇)
- Java8 观察者模式详解 jdk实现源码分析
- Android context源码详解及深入分析
- Wordpress源码分析 三阶段详解(2)
- SpringMVC @SessionAttributes 使用详解以及源码分析
- NopCommerce源码架构详解--路由相关源码分析
- Laravel5.5源码详解 -- 中间件MiddleWare分析
- ZRender源码分析5:Shape绘图详解
- netty EventLoop write() 源码分析(二)
- Android进阶——Small源码分析之启动流程详解
- Epoll详解及源码分析
- Java集合:Hashtable使用详解及源码分析
- Java IO:PipedOutputStream和PipedInputStream使用详解及源码分析
- 源码之下无秘密 ── 做最好的 Netty 源码分析教程