您的位置:首页 > 理论基础 > 计算机网络

Java网络编程之Netty拆包和黏包-yellowcong

2017-10-08 10:38 495 查看
Netty中,解决拆包和黏包中,解决方式有三种

1、在每个包尾部,定义分隔符,通过回车符号,或者其他符号来解决

2、通过定义每个包的大小,如果包不够就空格填充

3、自定义协议的方式,将消息分为消息头和消息体,在消息头中表示出消息的总长度,然后进行逻辑处理。

案例

这个案例是通过第一种方式,通过回车符号的方式来解决拆包和黏包,通过在childHandler 中添加 指定的分隔符进行拆包黏包,通过DelimiterBasedFrameDecoder (定义分隔符)和StringDecoder(将发送的消息定义为String类型,而不是默认的ByteBuf类型了)这两个类完成处理,在客户端和服务端,都要 设定相同的Handler。

服务户端

服务器端,重点是childHandler ,自定义了包的分隔符

package yellowcong.socket.netty2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 创建日期:2017年10月8日 <br/>
* 创建用户:yellowcong <br/>
* 功能描述:
*/
public class Server {
public static final Integer PORT = 8080;

public static void main(String[] args) throws Exception {
//1.第一个线程组 是用于接收Client端连接的
EventLoopGroup  bootGroup = new NioEventLoopGroup(); //客户端
//2.第二个线程组 实际业务操作请求
EventLoopGroup  workGroup = new NioEventLoopGroup(); //网络读写

//3.服务器启动类,配置服务器
ServerBootstrap bootstrap = new ServerBootstrap();

//加入客户端线程和网络读写
bootstrap.group(bootGroup, workGroup)
//我要指定使用NioServerSocketChannel这种类型的通道 ,当我们是Http的时候,需要更换这个Channel的类型
.channel(NioServerSocketChannel.class)
// 指定处理SockerChannel 的处理器
.childHandler(new ChannelInitializer<SocketChannel>(){ //一掉要注意这个 SocketChannel 是Netty封装的,不是NIO

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//将我们的服务器处理类传递进去
//设定回车就是一条消息
ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt));
//字符串形式的解码
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
})
//设定大小
.option(ChannelOption.SO_BACKLOG, 128)  //生产环境中,最好配额制100多
//保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
;

System.out.println("服务器启动。。。。。。");
//绑定指定的端口 进行监听
ChannelFuture future = bootstrap.bind(PORT).sync();

//如果不休眠 ,直接就结束了
//      Thread.sleep(1000000);

//关闭Channel
//这个是相当于程序是睡眠模式,线程阻塞在这个地方
future.channel().closeFuture().sync();

//关闭线程组
bootGroup.shutdownGracefully();
workGroup.shutdownGracefully();

}
}


服务端消息处理

消息处理端,复写了channelReadComplete 方法,用来断开连接操作,其次在channelRead中,消息是String类型的,不能转化为ByteBuf类型处理。消息发送的时候,加上了分隔符(\r\n)

package yellowcong.socket.netty2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* 创建日期:2017年10月8日 <br/>
* 创建用户:yellowcong <br/>
* 功能描述:服务器请求
*/

//这个Handler需要继承 ChannelHandlerAdapter,这个是Netty实现的
public class ServerHandler extends ChannelHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//由于设定的解码器是String 类型的,这个地方就Msg对象是String类型的
//读取Message
String result = (String) msg;
System.out.println(result);
//回馈客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Server get Data\r\n".getBytes()));

}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

//设定响应后就关闭程序
ctx.close().addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//用来处理Netty执行过程中的异常
cause.printStackTrace(); //打印错误
ctx.close(); //关闭容器
}

}


客户端

客户端,也是在handler 的处理类中,增加了两个处理器,这样保证服务器和客户端的处理消息的方式一致,发送消息的时候加上了分隔符(\r\n)

package yellowcong.socket.netty2;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* 创建日期:2017年10月8日 <br/>
* 创建用户:yellowcong <br/>
* 功能描述:
*/
public class Client {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup workGroup = new NioEventLoopGroup();

//客户端启动
Bootstrap boot  = new Bootstrap();

boot.group(workGroup)
.channel(NioSocketChannel.class) //客户端的Socker
.handler(new ChannelInitializer<SocketChannel>(){ //一掉要注意这个 SocketChannel 是Netty封装的,不是NIO

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//将我们的服务器处理类传递进去
ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt));
//字符串形式的解码
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});

//绑定指定的端口 进行监听
ChannelFuture channel = boot.connect("127.0.0.1", 8080).sync();

//发送数据
channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client1\r\n".getBytes()));

channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client2\r\n".getBytes()));

channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client3\r\n".getBytes()));

//这个相当于没有关闭Channel,注释
channel.channel().closeFuture().sync();

//关闭线程组
workGroup.shutdownGracefully();
}
}


客户端消息处理类

消息处理类,没做任何改变,只是读取消息

package yellowcong.socket.netty2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* 创建日期:2017年10月8日 <br/>
* 创建用户:yellowcong <br/>
* 功能描述:客户端请求的处理
*/
public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//打印错误
cause.printStackTrace();

//关闭容器
ctx.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
//获取message
String data = (String) msg;

//打印数据
System.out.println("Client:\t"+data);

} catch (Exception e) {
e.printStackTrace();
}finally{
//释放消息
ReferenceCountUtil.release(msg);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: