您的位置:首页 > 其它

Netty源码分析之handler decoder

2014-01-10 11:53 661 查看
ChannelHandler是处理业务逻辑的代码所在。下面首先分析下ChannelHandler的体系结构,然后重点分析几个有意思的Handler:IdleStateHandler、ExceptionHandler

1.ChannelHandler在Netty中的位置

ChannelHandler分upstreamChannelHandler和downstreamChannelHandler,分别处理upstream事件和downstream事件。下图是一个Channel组成示意图。

Channel 通过ChannelFactory创建,pipeline通过PipelineFactory创建,默认每个Pipeline都有独立的一系列 Handler,每当生成pipeline时都会创建Handler。HandlerContext对Handler进行包装,相互之间形成链式结构,依 次传递事件进行处理。upstream链处理由网络层向用户传递事件,包括对消息的解码decoder,用户对事件的响应方法;downstream链处 理由用户向网络层发送事件,包括写事件、消息编码,最后由sink接收处理downstream事件操作SocketChannel。

2.ChannelUpstreamHandler

该 类定义了对upstream事件的处理接口,而SimpleChannelUpstreamHandler是对事件的简单实现adapter,如果用户自 己实现其对消息的处理,可直接继续SimpleChannelUpstreamHandler,重写相应的方法,如messageReceived、 channelOpen等。

3.FrameDecoder

因为TCP流在传输时会分片、重组,并不能断定字节流中的一个数据包的结束。如发送方的数据可能是这样分段

* +-----+-----+-----+

* | ABC | DEF | GHI |

* +-----+-----+-----+

因为数据包的分片,接收方的数据可能就是:

* +----+-------+---+---+

* | AB | CDEFG | H | I |

* +----+-------+---+---+

因此需要decoder对字节流进行分段,由字节序列转成有正确、意义的POJO。

decoder作为SimpleChannelUpstreamHandler的子类,其实也是一个Handler,只不过不是直接处理业务逻辑,而对根据规定的网络协议进行解码分段。因此解码过程自然在messageReceived进行。

Java代码


if (cumulation == null) {
try {
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
} else {
input = appendToCumulation(input);
try {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
}

在解码时,如果上次还有残留未处理的buffer,则在appendToCumulation中和这次收到的数据合并成一个新的 buffer进行解码。在callDecode中,当数据足够解码出一个POJO时,则继续发送upstream事件,只是这次的消息对象由 ChannelBuffer变成了解码的POJO。如果解码还有数据剩余,甚至一个POJO都没有解码出来,则需要在updateCumulation中 将这些数据保存在decoder中,以便下次收到消息时再合并处理。

对于使用者来说,构建自己协议的decoder,需要继承FrameDecoder,重写decode()方法,当解码出POJO时则返回该对象,否则返回NULL并重置buffer的readIndex等待下次解码。



Java代码


public class BigIntegerDecoder extends FrameDecoder {

@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// Wait until the length prefix is available.
if (buffer.readableBytes() < 5) {
return null;
}

buffer.markReaderIndex();

// Check the magic number.
int magicNumber = buffer.readUnsignedByte();
if (magicNumber != 'F') {
buffer.resetReaderIndex();
throw new CorruptedFrameException(
"Invalid magic number: " + magicNumber);
}

// Wait until the whole data is available.
int dataLength = buffer.readInt();
if (buffer.readableBytes() < dataLength) {
buffer.resetReaderIndex();
return null;
}

// Convert the received data into a new BigInteger.
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);

return new BigInteger(decoded);
}
}

因此FrameDecoder更像是从流里面捡出一个个消息体StreamToOneDecoder

4.OneToOneDecoder

这 是一个消息转成另一个消息的解码器,常常在FrameDecoder解码之后再使用,因为经过FrameDecoder解码后的一个个消息已经是完整独立 的消息对象,所以它不需要处理消息分隔的工作,它所承担的职责也就轻很多,例如Base64编码等工作。对使用者来说,只要继承它并重写decode方 法。

5.ChannelDownstreamHandler

与ChannelUpstreamHandler相对应,只是在事件向相反方向传递时对事件进行响应,包括对writeRequested、connectRequested等处理。

使用者可以实现接口ChannelDownstreamHandler的handleDownstream来实现自己的逻辑,或者直接继续重写SimpleChannelDownstreamHandler的各事件的处理方法。

6.OneToOneEncoder

相对decoder来说,encoder需要做的工作并不多,因为在发消息时并没有多个消息界限不明的问题,因此并不需要有相对的FrameEncoder。使用者只需继承它并重写encode方法。

7.SimpleChannelHandler

这个Handler同时继承了ChannelUpstreamHandler、ChannelDownstreamHandler,将upstream downstream两种事件的处理逻辑集中在一个类中。

8.ExecutionHandler

在 不使用这个Handler进行处理事件时,其它Handler的事件处理是同步进行的,即一个Worker线程在收到消息后执行业务Handler的方 法,如果方法内有非常耗时或者阻塞的操作如DB访问,这时Worker线程就不能及时返回处理其它请求,对性能影响较高。而 ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接 返回。

Java代码


public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
} else {
context.sendUpstream(e);
}
}

ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。

9.IdleStateHandler
这是Netty提供的超时检测处理Handler,可分别设置读超时、写超时、读写超时的时间。Netty采用Hashed Wheel模式设计定时器,将这些IdleStateHandler放到定时器中。每当有消息事件经过时及时更新时间,如果超时则自它向上传递超时事件。 注意这个超时是人为设计的,并非socket真正在超时,此时仍能对socket读写。对使用者来说,如果要使用这个Handler,则业务逻辑处理类须
继续IdleStateAwareChannelUpstreamHandler,重写其channelIdle方法。如果要保持长连接可以此处发送心跳 包。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: