您的位置:首页 > 其它

netty源码分析之-处理器详解(9)

2017-12-19 21:34 561 查看
Netty处理器重要的概念:

Netty的处理器可以分为两类:入站处理器与出站处理器

入站处理器的顶层是ChannelInboundHandler,出站处理器的顶层是ChannelOutboundHandler

数据处理时常用的各种编码器本质上都是处理器

编解码器:无论我们向网络写入的数据是什么类型(int、char、String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的;将数据由原本的形式转换为字节流的操作称为编码(encode),将数据由字节转换为它原本的格式或是其他格式的操作称为解码(decode),编解码统一成为codec

编码:本质上是一种出站处理器,因此,编码一定是一种ChannelOutboundHandler

解码:本质上是一种入站处理器,因此,解码一定是一种ChannelInboundHandler

在Netty中,编码器通常以XXXEncoder命名,解码器通常以XXXDdcoder命名

MessageToByteEncoder

对于编码器的顶层抽象类MessageToByteEncoder,像流式处理一样将一个消息转换成一个ByteBuf,encode是需要实现的重要方法:

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{}


public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}


ByteToMessageDecoder

对于解码器的顶层抽象类ByteToMessageDecoder,像流式处理一样将一个ByteBuf转换成需要的其他对象类型,decode是需要实现的重要方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}


public class SquareDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
out.add(in.readBytes(in.readableBytes()));
}
}


在这里由于tcp传输数据的粘包与拆包,接受到的数据不一定是完整的,因此我们需要判断是否能够处理进而转换成我们需要的类型,需要类似以下的判断,实际上后续有ReplayingDecoder能够更加方便的来处理。

//整形举例
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}


MessageToMessageDecoder

将消息从一个类型转换成另一个类型,我们可以在ChannelInitailizer中添加多个编码器或者解码器来同时处理网络中接受到到消息(或者写到网络),类似的编解码器都是类似的功能

public class StringToIntegerDecoder extends
MessageToMessageDecoder<String> {

@Override
public void decode(ChannelHandlerContext ctx, String message,
List<Object> out) throws Exception {
out.add(message.length());
}
}


ReplayingDecoder

ReplayingDecoder是ByteToMessageDecoder的一个特殊变种,能够在阻塞I/O的范式中实现非阻塞解码。与ByteToMessageDecoder最大的不同是,ReplayingDecoder允许你直接实现decode或者decodeLast就好像所需要的字节数据已经接受到,而不用去监测是否有足够的字节数据能够用来解码

例如,对于ByteToMessageDecoder解码可能会有如下操作:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {

if (buf.readableBytes() < 4) {
return;
}

buf.markReaderIndex();
int length = buf.readInt();   //header

if (buf.readableBytes() < length) {  //body
buf.resetReaderIndex();
return;
}

out.add(buf.readBytes(length));
}
}


如果第一次读取的header,能够在接下来的读取中全部接受则进行解码处理,否则返回到标记处,也就是无法解码,等待接受更多的数据

而对于ReplayingDecoder的解码,可能会更简洁:

public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {

protected void decode(ChannelHandlerContext ctx,
ByteBuf buf) throws Exception {

out.add(buf.readBytes(buf.readInt()));
}
}


ReplayingDecoder对ByteBuf进行了特殊的实现,当没有足够的数据来解码成特定类型的时候会抛错,在上述方法IntegerHeaderFrameDecoder实现中,当接受到4个或者更多字节的时候能够正常返回整型header,否则将抛错,如果ReplayingDecoder捕获到了异常,将会重新设置readerI
4000
ndex到初始位置,然后重复调用decode来接受更多的字节数据。

ReplayingDecoder如此简洁也会有一些代价,如:

一些ByteBuf的操作可能被禁止

如果网络很差或者消息格式很复杂,那么会导致性能比较差。因为当接受到的字节数据不够会重复的调用多次decode来获取更多的消息;而不会像ByteToMessageDecoder一样直接返回

可能需要注意decode方法会背调用多次为了解码一条消息

public class MyDecoder extends ReplayingDecoder<Void> {

private final Queue<Integer> values = new LinkedList<Integer>();

@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {

// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());

// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}


当调用第二次values.offer(buf.readInt());的时候可能没有足够多的字节数据,则会重复调用decode,但是此时queue中可能不止2条数据。

正确的做法应该是以下方式:

public class MyDecoder extends ReplayingDecoder<Void> {

private final Queue<Integer> values = new LinkedList<Integer>();

@Override
public void decode(.., ByteBuf buf, List<Object> out) throws Exception {

// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();

// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());

// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}


当数据不够需要重新调用decode的时候,做一次清除queue的操作来保证队列正确的数据

但是我们可以通过调用checkpoint()方法来更新buffer的初始位置,让readerIndex能够重新回到该起始位置。从而避免多次重复的调用decode来对同一个消息多次解码

下面的这些类处理更加复杂的用例:

io.netty.handler.codec.LineBasedFrameDecoder— 这个类在 Netty 内部也有使

用,它使用了行尾控制字符(\n 或者\r\n)来解析消息数据;

io.netty.handler.codec.http.HttpObjectDecoder— 一个 HTTP 数据的解码器。

自定义协议编解码来解决粘包与拆包:

https://github.com/huiGod/netty_lecture/tree/master/src/main/java/com/shengsiyuan/netty/handler3

关于Netty编解码器的重要结论:

无论是编码器还是解码器,其所接受的消息类型必须要与待处理的参数类型一直,否则该编码器或解码器并不会被执行

在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够,否则将会产生一些问题

同样对于编码器和解码器来说,引用计数也需要特别注意,一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放,如果需要保留引用以便稍后使用,那么可以调用referenceCountUtil.retain(message)方法来增加该引用计数,从而防止该消息被释放
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  处理器 编解码器