Netty自定义协议开发
2015-12-24 17:23
435 查看
自定义协议格式
%1$8s%2$4s%3$4s%4$8s%5$16s%6$32s%7$8s
类似:AAAAAAAA0001000011001001usernameusernamesession1session1session1session100000023
参考代码
package com.phei.netty.codec.custom; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledDirectByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; import io.netty.util.ReferenceCountUtil; /** * * @author Administrator * <pre> * BEFORE DECODE AFTER DECODE * +------+--------+----------------+------+ +----------------+ * | head | Length | body | tail |------->| body | * | int | int | some bytes | int | | some bytes | * +------+--------+----------------+------+ +----------------+ * </pre> */ public class CustomDecoder2 extends ByteToMessageDecoder { private static final ByteBuf HEAD_FLAG = Unpooled.copyInt(0x77aa77aa); private static final int TAIL_FLAG = 0x77ab77ab; private static java.util.concurrent.atomic.AtomicInteger c= new AtomicInteger(1); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object o = decode(ctx, in); if (o != null) { out.add(o); System.err.println(c.getAndIncrement()); } } private Object decode(ChannelHandlerContext ctx, ByteBuf in) { in.markReaderIndex(); //查找包头位置 int headerOffset = indexOf(in, HEAD_FLAG); if (headerOffset < 0) { //找不到包头,丢弃多余字节 if(in.readableBytes() > 3) in.skipBytes(in.readableBytes()-3); return null; } else { //找到包头,跳到包头结束位置 in.skipBytes(headerOffset + 4); } //如发现剩余字节不够4位,回滚指针,等待下次解码 if (in.readableBytes() < 4) { in.resetReaderIndex(); return null; } //读取length int bodyLength = in.readInt(); //非法length,丢弃全部字节,等待下次解码 if (bodyLength < 0) { return null; } //计算剩余字节数(包体 +包尾) int len = bodyLength + 4; //如剩余字节不够,回滚指针,等待下次解码 if (in.readableBytes() < len) { in.resetReaderIndex(); return null; } //获取包体 ByteBuf frame = ctx.alloc().buffer(bodyLength); in.readBytes(frame); //读取包尾 int tail = in.readInt(); //包尾不匹配,则丢弃包体,等待下次解码 if (tail != TAIL_FLAG) { ReferenceCountUtil.release(frame); return null; } return frame; } private static int indexOf(ByteBuf haystack, ByteBuf needle) { for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) { int haystackIndex = i; int needleIndex; for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) { if (haystack.getByte(haystackIndex) != needle .getByte(needleIndex)) { break; } else { haystackIndex++; if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) { return -1; } } } if (needleIndex == needle.capacity()) { // Found the needle from the haystack! return i - haystack.readerIndex(); } } return -1; } public static void main(String[] args) { ByteBuf haystack = Unpooled.buffer(); haystack.writeShort(2); ByteBuf needle = Unpooled.copyInt(0x77aa77aa); // needle.writeInt(0x77aa77aa); System.out.println(indexOf(haystack, needle)); System.out.println(haystack.readerIndex()); } }
我的代码
解码器:package com.phei.netty.codec.custom; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * CustomDecoder.java 说明 * * <pre> * BEFORE DECODE AFTER DECODE * * Head Body * 长度80:%1$8s%2$4s%3$4s%4$8s%5$16s%6$32s%7$8s 不定长 * (16, 24)命令id,(72, 80)包体长度 * * </pre> * * @author zhengzy * @date 2015年12月24日上午10:50:26 */ public class CustomDecoder extends ByteToMessageDecoder { private static int HEADER_SIZE = 80; private static int dstStartPos = 72; private static int dstStopPos = 80; private static ByteBuf buf = Unpooled.buffer(); private static java.util.concurrent.atomic.AtomicInteger c = new AtomicInteger(1); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // TODO Auto-generated method stub Object o = decode(ctx, in); if (o != null) { out.add(o); System.out.println(c.getAndIncrement()); } } private Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { //标记读指针位置,以便可以回滚指针 in.markReaderIndex(); // 如发现剩余字节不够80位,回滚指针,等待下次解码 if (in.readableBytes() < HEADER_SIZE) { in.resetReaderIndex(); return null; } //读取包头信息 in.readBytes(buf, HEADER_SIZE); byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String headStr = new String(req, "UTF-8"); int msgLen = Integer.valueOf(headStr.substring(dstStartPos, dstStopPos).trim()); // 如发现剩余字节不够包体长度,回滚指针,等待下次解码 if (in.readableBytes() < msgLen) { in.resetReaderIndex(); return null; } //读取包体信息 in.readBytes(buf, msgLen); ByteBuf frame = ctx.alloc().buffer(HEADER_SIZE + msgLen); frame.writeBytes(buf, 0, HEADER_SIZE + msgLen); buf.clear(); return frame; } }
编码器:
package com.phei.netty.codec.custom; import com.lytz.pb.Config; import com.lytz.pb.ProtocolUtils; import com.lytz.pb.QstockProtocol.LOFMergeSplitReqt; import com.lytz.pb.QstockProtocol.StockCreateRedeemReqt; import com.lytz.pb.QstockProtocol.StockOrderReqt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * CustomEncoder.java 说明 * * @author zhengzy * @date 2015年12月24日下午1:33:24 */ public class CustomEncoder extends MessageToByteEncoder<Message> { private static ByteBuf buf = Unpooled.buffer(); private static byte[] body = null; private static byte[] pro = null; @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { switch (msg.getMsgId()) { case Config.ID_S_ORDER_REQT: StockOrderReqt stockOrderReqt = (StockOrderReqt) msg.getObject(); body = stockOrderReqt.toByteArray(); pro = ProtocolUtils.getProtocol(Config.ID_S_ORDER_REQT, body); buf.writeBytes(pro); out.writeBytes(buf); break; case Config.ID_S_LOF_UP_REQT: LOFMergeSplitReqt mergeSplitReqt = (LOFMergeSplitReqt) msg.getObject(); body = mergeSplitReqt.toByteArray(); pro = ProtocolUtils.getProtocol(Config.ID_S_LOF_UP_REQT, body); buf.writeBytes(pro); out.writeBytes(buf); break; case Config.ID_S_LOF_CR_REQT: StockCreateRedeemReqt stockCreateRedeemReqt = (StockCreateRedeemReqt) msg.getObject(); body = stockCreateRedeemReqt.toByteArray(); pro = ProtocolUtils.getProtocol(Config.ID_S_LOF_CR_REQT, body); buf.writeBytes(pro); out.writeBytes(buf); break; default: break; } //清除容器中的数据 buf.clear(); body = null; pro = null; } }
代码说明
/** * 找到needle在haystack中的第一个位置,返回位置,没有返回-1 * @param haystack * @param needle * @return */ private static int indexOf(ByteBuf haystack, ByteBuf needle) { for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) { int haystackIndex = i; int needleIndex; for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) { if (haystack.getByte(haystackIndex) != needle .getByte(needleIndex)) { break; } else { haystackIndex++; if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) { return -1; } } } if (needleIndex == needle.capacity()) { // Found the needle from the haystack! return i - haystack.readerIndex(); } } return -1; }
测试代码:
String bz = "ABCDEFGH"; String str = "123ABCDEFGH0001123411101007namenamenamename00000000000000000000000000000000 11"; ByteBuf buf = Unpooled.buffer(); buf.writeBytes(str.getBytes()); ByteBuf bzBuf = Unpooled.buffer(); bzBuf.writeBytes(bz.getBytes()); System.out.println(indexOf(buf, Unpooled.copiedBuffer(bz.getBytes())));
参考
http://www.infoq.com/cn/articles/netty-codec-framework-analysehttp://my.oschina.net/xinxingegeya/blog/282970
http://asialee.iteye.com/blog/1784844
相关文章推荐
- Netty使用Http上传文件
- tomcat、netty以及nodejs的helloworld性能对比 3ff8
- Netty入门-client/server
- flatbuffers 和netty的结合使用
- netty 处理远程主机强制关闭一个连接
- Netty 源码分析(三):服务器端的初始化和注册过程
- 轻量级分布式 RPC 框架
- spark总体概况
- Netty系列之Netty百万级推送服务设计要点
- Netty初步
- Netty ChannelBuffer 简介
- netty4研究系列-序
- netty io.netty.channel 简介1
- spark overview
- Netty4和Netty5内存池的使用心得
- Netty与Reactor模式
- Netty源码分析之DelimiterBasedFrameDecoder
- Netty4学习笔记-001
- Netty4学习笔记-002
- Netty -- 内存管理