Netty源码分析之handler decoder
2014-01-10 11:53
661 查看
ChannelHandler是处理业务逻辑的代码所在。下面首先分析下ChannelHandler的体系结构,然后重点分析几个有意思的Handler:IdleStateHandler、ExceptionHandler
1.ChannelHandler在Netty中的位置
ChannelHandler分upstreamChannelHandler和downstreamChannelHandler,分别处理upstream事件和downstream事件。下图是一个Channel组成示意图。
Channel 通过ChannelFactory创建,pipeline通过PipelineFactory创建,默认每个Pipeline都有独立的一系列 Handler,每当生成pipeline时都会创建Handler。HandlerContext对Handler进行包装,相互之间形成链式结构,依 次传递事件进行处理。upstream链处理由网络层向用户传递事件,包括对消息的解码decoder,用户对事件的响应方法;downstream链处 理由用户向网络层发送事件,包括写事件、消息编码,最后由sink接收处理downstream事件操作SocketChannel。
2.ChannelUpstreamHandler
该 类定义了对upstream事件的处理接口,而SimpleChannelUpstreamHandler是对事件的简单实现adapter,如果用户自 己实现其对消息的处理,可直接继续SimpleChannelUpstreamHandler,重写相应的方法,如messageReceived、 channelOpen等。
3.FrameDecoder
因为TCP流在传输时会分片、重组,并不能断定字节流中的一个数据包的结束。如发送方的数据可能是这样分段
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
因为数据包的分片,接收方的数据可能就是:
* +----+-------+---+---+
* | AB | CDEFG | H | I |
* +----+-------+---+---+
因此需要decoder对字节流进行分段,由字节序列转成有正确、意义的POJO。
decoder作为SimpleChannelUpstreamHandler的子类,其实也是一个Handler,只不过不是直接处理业务逻辑,而对根据规定的网络协议进行解码分段。因此解码过程自然在messageReceived进行。
Java代码
if (cumulation == null) {
try {
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
} else {
input = appendToCumulation(input);
try {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
}
在解码时,如果上次还有残留未处理的buffer,则在appendToCumulation中和这次收到的数据合并成一个新的 buffer进行解码。在callDecode中,当数据足够解码出一个POJO时,则继续发送upstream事件,只是这次的消息对象由 ChannelBuffer变成了解码的POJO。如果解码还有数据剩余,甚至一个POJO都没有解码出来,则需要在updateCumulation中 将这些数据保存在decoder中,以便下次收到消息时再合并处理。
对于使用者来说,构建自己协议的decoder,需要继承FrameDecoder,重写decode()方法,当解码出POJO时则返回该对象,否则返回NULL并重置buffer的readIndex等待下次解码。
如
Java代码
public class BigIntegerDecoder extends FrameDecoder {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// Wait until the length prefix is available.
if (buffer.readableBytes() < 5) {
return null;
}
buffer.markReaderIndex();
// Check the magic number.
int magicNumber = buffer.readUnsignedByte();
if (magicNumber != 'F') {
buffer.resetReaderIndex();
throw new CorruptedFrameException(
"Invalid magic number: " + magicNumber);
}
// Wait until the whole data is available.
int dataLength = buffer.readInt();
if (buffer.readableBytes() < dataLength) {
buffer.resetReaderIndex();
return null;
}
// Convert the received data into a new BigInteger.
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
return new BigInteger(decoded);
}
}
因此FrameDecoder更像是从流里面捡出一个个消息体StreamToOneDecoder
4.OneToOneDecoder
这 是一个消息转成另一个消息的解码器,常常在FrameDecoder解码之后再使用,因为经过FrameDecoder解码后的一个个消息已经是完整独立 的消息对象,所以它不需要处理消息分隔的工作,它所承担的职责也就轻很多,例如Base64编码等工作。对使用者来说,只要继承它并重写decode方 法。
5.ChannelDownstreamHandler
与ChannelUpstreamHandler相对应,只是在事件向相反方向传递时对事件进行响应,包括对writeRequested、connectRequested等处理。
使用者可以实现接口ChannelDownstreamHandler的handleDownstream来实现自己的逻辑,或者直接继续重写SimpleChannelDownstreamHandler的各事件的处理方法。
6.OneToOneEncoder
相对decoder来说,encoder需要做的工作并不多,因为在发消息时并没有多个消息界限不明的问题,因此并不需要有相对的FrameEncoder。使用者只需继承它并重写encode方法。
7.SimpleChannelHandler
这个Handler同时继承了ChannelUpstreamHandler、ChannelDownstreamHandler,将upstream downstream两种事件的处理逻辑集中在一个类中。
8.ExecutionHandler
在 不使用这个Handler进行处理事件时,其它Handler的事件处理是同步进行的,即一个Worker线程在收到消息后执行业务Handler的方 法,如果方法内有非常耗时或者阻塞的操作如DB访问,这时Worker线程就不能及时返回处理其它请求,对性能影响较高。而 ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接 返回。
Java代码
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
} else {
context.sendUpstream(e);
}
}
ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。
9.IdleStateHandler
这是Netty提供的超时检测处理Handler,可分别设置读超时、写超时、读写超时的时间。Netty采用Hashed Wheel模式设计定时器,将这些IdleStateHandler放到定时器中。每当有消息事件经过时及时更新时间,如果超时则自它向上传递超时事件。 注意这个超时是人为设计的,并非socket真正在超时,此时仍能对socket读写。对使用者来说,如果要使用这个Handler,则业务逻辑处理类须
继续IdleStateAwareChannelUpstreamHandler,重写其channelIdle方法。如果要保持长连接可以此处发送心跳 包。
1.ChannelHandler在Netty中的位置
ChannelHandler分upstreamChannelHandler和downstreamChannelHandler,分别处理upstream事件和downstream事件。下图是一个Channel组成示意图。
Channel 通过ChannelFactory创建,pipeline通过PipelineFactory创建,默认每个Pipeline都有独立的一系列 Handler,每当生成pipeline时都会创建Handler。HandlerContext对Handler进行包装,相互之间形成链式结构,依 次传递事件进行处理。upstream链处理由网络层向用户传递事件,包括对消息的解码decoder,用户对事件的响应方法;downstream链处 理由用户向网络层发送事件,包括写事件、消息编码,最后由sink接收处理downstream事件操作SocketChannel。
2.ChannelUpstreamHandler
该 类定义了对upstream事件的处理接口,而SimpleChannelUpstreamHandler是对事件的简单实现adapter,如果用户自 己实现其对消息的处理,可直接继续SimpleChannelUpstreamHandler,重写相应的方法,如messageReceived、 channelOpen等。
3.FrameDecoder
因为TCP流在传输时会分片、重组,并不能断定字节流中的一个数据包的结束。如发送方的数据可能是这样分段
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
因为数据包的分片,接收方的数据可能就是:
* +----+-------+---+---+
* | AB | CDEFG | H | I |
* +----+-------+---+---+
因此需要decoder对字节流进行分段,由字节序列转成有正确、意义的POJO。
decoder作为SimpleChannelUpstreamHandler的子类,其实也是一个Handler,只不过不是直接处理业务逻辑,而对根据规定的网络协议进行解码分段。因此解码过程自然在messageReceived进行。
Java代码
if (cumulation == null) {
try {
// the cumulation buffer is not created yet so just pass the input to callDecode(...) method
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
} else {
input = appendToCumulation(input);
try {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
} finally {
updateCumulation(ctx, input);
}
}
在解码时,如果上次还有残留未处理的buffer,则在appendToCumulation中和这次收到的数据合并成一个新的 buffer进行解码。在callDecode中,当数据足够解码出一个POJO时,则继续发送upstream事件,只是这次的消息对象由 ChannelBuffer变成了解码的POJO。如果解码还有数据剩余,甚至一个POJO都没有解码出来,则需要在updateCumulation中 将这些数据保存在decoder中,以便下次收到消息时再合并处理。
对于使用者来说,构建自己协议的decoder,需要继承FrameDecoder,重写decode()方法,当解码出POJO时则返回该对象,否则返回NULL并重置buffer的readIndex等待下次解码。
如
Java代码
public class BigIntegerDecoder extends FrameDecoder {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// Wait until the length prefix is available.
if (buffer.readableBytes() < 5) {
return null;
}
buffer.markReaderIndex();
// Check the magic number.
int magicNumber = buffer.readUnsignedByte();
if (magicNumber != 'F') {
buffer.resetReaderIndex();
throw new CorruptedFrameException(
"Invalid magic number: " + magicNumber);
}
// Wait until the whole data is available.
int dataLength = buffer.readInt();
if (buffer.readableBytes() < dataLength) {
buffer.resetReaderIndex();
return null;
}
// Convert the received data into a new BigInteger.
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
return new BigInteger(decoded);
}
}
因此FrameDecoder更像是从流里面捡出一个个消息体StreamToOneDecoder
4.OneToOneDecoder
这 是一个消息转成另一个消息的解码器,常常在FrameDecoder解码之后再使用,因为经过FrameDecoder解码后的一个个消息已经是完整独立 的消息对象,所以它不需要处理消息分隔的工作,它所承担的职责也就轻很多,例如Base64编码等工作。对使用者来说,只要继承它并重写decode方 法。
5.ChannelDownstreamHandler
与ChannelUpstreamHandler相对应,只是在事件向相反方向传递时对事件进行响应,包括对writeRequested、connectRequested等处理。
使用者可以实现接口ChannelDownstreamHandler的handleDownstream来实现自己的逻辑,或者直接继续重写SimpleChannelDownstreamHandler的各事件的处理方法。
6.OneToOneEncoder
相对decoder来说,encoder需要做的工作并不多,因为在发消息时并没有多个消息界限不明的问题,因此并不需要有相对的FrameEncoder。使用者只需继承它并重写encode方法。
7.SimpleChannelHandler
这个Handler同时继承了ChannelUpstreamHandler、ChannelDownstreamHandler,将upstream downstream两种事件的处理逻辑集中在一个类中。
8.ExecutionHandler
在 不使用这个Handler进行处理事件时,其它Handler的事件处理是同步进行的,即一个Worker线程在收到消息后执行业务Handler的方 法,如果方法内有非常耗时或者阻塞的操作如DB访问,这时Worker线程就不能及时返回处理其它请求,对性能影响较高。而 ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接 返回。
Java代码
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
} else {
context.sendUpstream(e);
}
}
ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。
9.IdleStateHandler
这是Netty提供的超时检测处理Handler,可分别设置读超时、写超时、读写超时的时间。Netty采用Hashed Wheel模式设计定时器,将这些IdleStateHandler放到定时器中。每当有消息事件经过时及时更新时间,如果超时则自它向上传递超时事件。 注意这个超时是人为设计的,并非socket真正在超时,此时仍能对socket读写。对使用者来说,如果要使用这个Handler,则业务逻辑处理类须
继续IdleStateAwareChannelUpstreamHandler,重写其channelIdle方法。如果要保持长连接可以此处发送心跳 包。
相关文章推荐
- Netty源码分析之handler decoder
- Netty学习:ChannelHandler执行顺序详解,附源码分析
- netty源码分析 之九 handler
- Netty 心跳服务之 IdleStateHandler 源码分析
- Netty 源码分析之SimpleChannelInboundHandler
- netty源码分析(十二)Channel与ChannelHandler及ChannelHandlerContext之间的关系分析
- netty源码分析 之五 transport(ChannelHandler)
- Netty3 源码分析 - ChannelHandlerContext
- netty源码分析之ChannelHandler
- Netty源码分析之DelimiterBasedFrameDecoder
- Netty源码分析:图解Pipeline、Handler、Context
- Netty3 源码分析 - ChannelUpstreamHandler
- netty源码分析之-Channel、ChannelPipeline、ChannelHandler以及 ChannelHandlerContext 详解(2)
- Netty 源码分析之ByteToMessageDecoder
- netty源码分析(二十三)ReplayingDecoder源码分析与特性解读以及其他编解码器介绍
- android的消息处理机制(图文+源码分析)—Looper/Handler/Message
- Netty学习之旅------源码分析Netty内存池分配机制初探--PoolArena、PoolChunk、PoolSubpage等数据结构分析
- springMVC源码分析--SimpleUrlHandlerMapping(四)
- springMVC源码分析--AbstractControllerUrlHandlerMapping(六)
- 【Netty源码分析】发送数据过程