您的位置:首页 > 理论基础 > 计算机网络

架构师养成记--20.netty的tcp拆包粘包问题

2017-01-30 21:49 399 查看
问题描述

比如要发ABC DEFG HIJK 这一串数据,其中ABC是一个包,DEFG是一个包,HIJK是一个包。由于TCP是基于流发送的,所以有可能出现ABCD EFGH 这种情况,那么ABC和D就粘包了,DEFG被拆开了。

解决方案

1、消息定长,例如报文大小控制为200,如果不够就空位补全

2、在包结尾加特殊字符进行分割,如$_

3、消息分为消息头和消息体,在消息中包含消息长度等字段,然后进行消息逻辑处理。

分隔符方案

服务端

import java.nio.ByteBuffer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Server {

public static void main(String[] args) throws Exception{
//1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

//2 创建服务器辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//设置特殊分隔符
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//设置字符串形式的解码
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});

//4 绑定连接
ChannelFuture cf = b.bind(8765).sync();

//等待服务器监听端口关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();

}

}


import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String request = (String)msg;
System.out.println("Server :" + msg);
String response = "服务器响应:" + msg + "$_";
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
ctx.close();
}

}


客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Client {

public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});

ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));

//等待客户端端口关闭
cf.channel().closeFuture().sync();
group.shutdownGracefully();

}
}


import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String)msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}


import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String)msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: