您的位置:首页 > 其它

Netty中对象序列化传输机制研究分析

2015-07-29 17:02 387 查看

1. 技术点描述

http://blog.163.com/linfenliang@126/blog/static/127857195201210742441234?suggestedreading


本文档主要说明如何利用netty进行对象传输

2. 实现方案


前提条件:对象须实现序列化。
基本思想:借助netty自带的ObjectDecoder和ObjectEncoder类实现对象序列化的传输

3. 参考源码包


源码包:org.jboss.netty.handler.codec.serialization

3.1. ObjectDecoder


此类主要完成数据转换成对象操作,继承了LengthFieldBasedFrameDecoder类。
核心方法:

public ObjectDecoder(int maxObjectSize,
ClassResolver classResolver) {
super(maxObjectSize,
0, 4, 0, 4);
this.classResolver =
classResolver;

}

此构造方法为最核心的对象解码器

参数说明:

maxObjectSize:序列化的对象的最大长度,一旦接收到的对象长度大于此值,抛出StreamCorruptedException异常,默认为1048576

classResolver:这个类(ClassResolver)会去加载已序列化的对象,

常用调用方式:ClassResolvers.cacheDisabled(Plan.class.getClassLoader())

或者直接ClassResolvers.cacheDisabled(null)

3.2. ObjectEncoder


此类主要是完成对对象的编码操作,继承了OneToOneEncoder类。
核心方法:
public ObjectEncoder(int estimatedLength)
{
if (estimatedLength <
0) {
throw new IllegalArgumentException(
"estimatedLength:
" + estimatedLength);
}
this.estimatedLength = estimatedLength;
}
参数说明:
estimatedLength:预估的要编码的对象的大小,此值常常是我们经常发送的对象的大小,设置不宜过大或过小,设置的过大容易造成内存的浪费,设置过小容易造成要对一个对象编码时反复扩展容量,反复拷贝的内容消耗。

3.3. ObjectDecoderInputStream


当客户端采用了ObjectEncoder,而服务器段未采用ObjectDecoder,就需要用这个类进行转换,转换代码:
ChannelBuffer cb
= (ChannelBuffer) e.getMessage();

Object obj = null;
ByteArrayInputStream bi = null;
ObjectDecoderInputStream oi = null;

try {
bi = new ByteArrayInputStream(cb.array());
oi = new ObjectDecoderInputStream(bi);
obj = oi.readObject();
} catch (Exception
e2) {
LOGGER.error("byte数组转对象异常:"+e2);
} finally {
Utils.closeInStreamQuietly(bi);
Utils.closeInStreamQuietly(oi);
}

Plan plan = (Plan)obj;
LOGGER.info(plan);
plan.setStatus("
server received");
byte[]
b = Utils.obj2Byte(plan);
ChannelBuffer backBuffer = ChannelBuffers.buffer(b.length);
backBuffer.writeBytes(b);
e.getChannel().write(backBuffer,e.getRemoteAddress());

3.4. ObjectEncoderOutputStream


当服务器端采用了ObjectDecoder,而客户端采用的是非Netty中的ObjectEncoder,则需要用这个类进行转换,转换方式类似于3.3

注意:采用此方法后仍然需要在pipelineFactory设置
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, 4, 0, 4)(解码端)
和new
LengthFieldPrepender(4, false)(编码端)

Plan plan
= new Plan();
plan.setAuthor("linfenliang");
plan.setContent("something
about sequence");
plan.setDate("2012-11-07");
plan.setStatus("client
send ");
byte[]
b = Utils.obj2Byte(plan);
System.out.println(Utils.bytesToHex(b));
Channel channel = MoreClient.initChannel();
byte[]
bytes = null;
ByteArrayOutputStream bo = null;
ObjectEncoderOutputStream oo = null;

try {
bo = new ByteArrayOutputStream();
oo = new ObjectEncoderOutputStream(bo);
oo.writeObject(plan);
bytes = bo.toByteArray();
} catch (Exception
e) {
System.err.println("Object对象转byte数组异常:"+e);
} finally {
Utils.closeOutStreamQuietly(bo);
Utils.closeOutStreamQuietly(oo);
}

ChannelBuffer cb = ChannelBuffers.buffer(bytes.length);
cb.writeBytes(bytes);
channel.write(cb);

4. Demo实现


源码详见压缩包
注意创建的自定义bean必须实现Serializable,且客户端与服务器段的bean名字必须完全相同包也必须相同
public class Plan implements Serializable…

4.1. UDPServerPipelineFactory


getPipeline()方法中

ChannelPipeline pipeline = Channels.pipeline(
// new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
// new LengthFieldPrepender(4, false),
new ObjectDecoder(ClassResolvers.cacheDisabled(Plan.class.getClassLoader())),
new ObjectEncoder(1024),
this.EXECUTION_UP_HANDLER,
this.EXECUTION_DOWN_HANDLER,
new UDPServerHandler());
return pipeline;

4.2. UDPServerHandler



void messageReceived(ChannelHandlerContext ctx, MessageEvent e)方法中

Plan plan = (Plan) e.getMessage();
LOGGER.info(plan);
plan.setStatus("
server received");
e.getChannel().write(plan,e.getRemoteAddress());

4.3. UDPClient



Bean发送方法:

Plan plan = new Plan();

plan.setAuthor("linfenliang");

plan.setContent("something about sequence");

plan.setDate("2012-11-07");

plan.setStatus("client send ");

Channel channel = MoreClient.initChannel();

channel.write(plan);

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: