您的位置:首页 > 其它

Netty的分隔符和定长解码器应用

2015-06-04 08:52 501 查看
TCP以流的方式进行数据传输,上层的应用协议为了对消息进行分区,往往采用下面4种方式:

(1)消息长度固定,累计读取到长度总和为定长LEN的报文后,就以为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报;

(2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;

(3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符;

(4)通过在消息头中定义长度字段来标识消息的总长度。

我们先来看看通过DelimiterBasedFrameDecoder的使用,我们可以自动完成以分隔符作为码流结束标识的消息的解码;通过一个实例来演示,EchoServer接受到EchoClient的请求 消息后,将其打印出来,然后将原始消息返回给客户端,消息以"$_"作为分隔符。

EchoServer服务端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
      public void bind(int port) throws InterruptedException{
    	  //配置服务端的NIO线程组
    	  EventLoopGroup bossGroup=new NioEventLoopGroup();
    	  EventLoopGroup workerGroup=new NioEventLoopGroup();
    	  
    	  try {
			ServerBootstrap b=new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 100)
			.handler(new LoggingHandler(LogLevel.INFO))
			.childHandler(new ChannelInitializer<SocketChannel>(){

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//创建分隔符缓冲对象,使用$_作为分隔符
				  ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
				  //创建DelimiterBasedFrameDecoder对象,加入到ChannelPipeline中,它有两个参数,第一个参数表示单条消息的
				  //最大长度,当达到该长度的后仍然没有查找到分隔符,就抛出TooLongFrameException异常,防止由于异常码流缺失分隔符导致的内存溢出
				  //这是Netty解码器的可靠性保护;第二个参数就是分隔符缓冲对象
					ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
					ch.pipeline().addLast(new StringDecoder());
					ch.pipeline().addLast(new EchoServerHandler());
				}
				 
			});
			//绑定端口,同步等待成功
			ChannelFuture f=b.bind(port).sync();
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
			
		} finally{
			//优雅退出,释放线程资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
 
      }
      public static void main(String[] args) throws InterruptedException {
		int port=8088;
		if(args!=null&&args.length>0){
			 try {
				 port=Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
    	  new EchoServer().bind(port);
    	  
	}
      
      
}


EchoServerHandler服务端EchoServerHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoServerHandler extends ChannelHandlerAdapter{
      int counter=0;
      
      public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
    	  String body=(String)msg;
    	  System.out.println("this is "+ ++counter+" time receive client:["+body+"]");
    	  //由于我们设置DelimiterBasedFrameDecoder过滤掉了分隔符,所以,返回给客户端时需要在请求消息尾拼接分隔符“$_”
    	  //最后创建ByteBuf,将原始消息重新返回给客户端
    	  body+= "$_";
    	  ByteBuf echo=Unpooled.copiedBuffer(body.getBytes());
    	  ctx.writeAndFlush(echo);
    	  
      }
      public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    	  cause.printStackTrace();
    	  ctx.close();
      }
      
}
EchoClient客户端EchoClient:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class EchoClient {
       public void connect(int port,String host)throws Exception{
    	   //配置客户端NIO线程组
    	   EventLoopGroup group=new NioEventLoopGroup();
    	   try {
			 Bootstrap b=new Bootstrap();
			 b.group(group).channel(NioSocketChannel.class)
			 .option(ChannelOption.TCP_NODELAY, true)
			 .handler(new ChannelInitializer<SocketChannel>(){

				@Override
				protected void initChannel(SocketChannel arg0) throws Exception {
				  ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
					arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
					arg0.pipeline().addLast(new StringDecoder());
					arg0.pipeline().addLast(new EchoClientHandler());
				}
				  
			 });
    		 //发起异步链接操作
			ChannelFuture f=b.connect(host,port).sync();
			 //等待客户端链路关闭
			f.channel().closeFuture().sync();
			 
		} finally{
			//优雅退出,释放NIO线程组
			group.shutdownGracefully();
		}
 
       }
       public static void main(String[] args) throws Exception {
		  int port=8088;
		  if(args!=null&&args.length>0){
			  try {
				port=Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		  }
    	   new EchoClient().connect(port, "127.0.0.1");   
    	   
	}
       
}


EchoClient客户端EchoClientHandler:

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoClientHandler extends ChannelHandlerAdapter{
      private int counter;
      static final String ECHO_REQ="hi,Mr yang Welcome to Netty.$_";
      
      public EchoClientHandler(){
    	  
      }
      public void channelActive(ChannelHandlerContext ctx){
    	  for(int i=0;i<10;i++){
    		  ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    	  }
    	  
      }
      public void channelRead(ChannelHandlerContext ctx,Object msg){
    	  System.out.println("this is "+ ++counter+ "time receive server:["+msg+"]");
    	  
      }
      public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
    	  ctx.flush();
      }
      public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
    	  cause.printStackTrace();
    	  ctx.close();
      }
      
      
      
}
服务端运行结果:

客户端运行结果:

还有一个FixedLengthFrameDecoder解码器,它是一个固定长度解码器.

只要在服务端的ChannelPipeline中新增FixedLengthFrameDecoder,设置一个长度,然后再添加字符串解码器和处理事件就可以了,用法都差不多。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: