您的位置:首页 > 运维架构 > 网站架构

架构师入门笔记十 Netty5快速入门

2017-09-22 15:40 423 查看
架构师入门笔记十 Netty5快速入门

在了解IO,NIO,AIO知识后,学习Netty5便会轻松很多,本章节主要介绍Netty是如何接收,反馈数据和拆包粘包的问题。

1 Netty基础知识

1.1 Netty作用

Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能高可靠性 的网络服务器和客户端程序。Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty简化了网络程序的开发过程,比如TCP和UDP的 Socket的开发。

1.2 TCP和UDP

TCP(Transmission Control Protocol,传输控制协议)是基于连接的协议,也就是说,在正式收发数据前,必须和对方建立可靠的连接。

UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是面向非连接的协议,它不与对方建立连接,而是直接就把数据包发送过去!

 TCPUDP
是否连接面向连接面向非连接
传输可靠性可靠不可靠
应用场合传输大量数据传输少量数据
速度慢 

2 HelloWorld代码

2.1 DISCARD服务:丢弃服务,丢弃了所有接收到的数据,并不做任何响应。简单理解就是接收数据,不返回数据
2.2 ECHO服务:响应式协议,这个协议针对任何接收的数据都会返回一个响应
2.3 代码事例:
首选是服务器处理类
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* DISCARD协议 丢弃协议,其实就是只接收数据,不做任何处理。
* ECHO服务(响应式协议),其实就是返回数据。
* 实现步骤:
* step1 继承 ChannelHandlerAdapter
* step2 覆盖chanelRead()事件处理方法
* step3 释放ByteBuffer,ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
* step4 异常处理,即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
*/
public class DiscardServerHandler extends ChannelHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
// 简单的读写操作
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body);
chc.writeAndFlush(Unpooled.copiedBuffer("卒...... ".getBytes())); // 返回数据
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg); // 释放msg
}
}

@Override
public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
cause.printStackTrace();
chc.close();
}
}
然后是服务端Netty启动代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* step1 创建两个线程组分别负责接收和处理
* step2 启动NIO服务辅助类
* step3 绑定两个线程组,指定一个通道,关联一个处理类,设置一些相关参数
* step4 监听端口
* step5 关闭一些资源
*/
public class DiscardServer {

private static final int PORT = 8888; // 监听的端口号

public static void main(String[] args) {
// NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理进来的连接
try {
ServerBootstrap bootstrap = new ServerBootstrap(); // ServerBootstrap 是一个启动NIO服务的辅助启动类
bootstrap.group(bossGroup, workerGroup) // 绑定俩个线程组
.channel(NioServerSocketChannel.class) // 指定用 NioServerSocketChannel 通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DiscardServerHandler()); // DiscardServerHandler是我们自定义的服务器处理类,负责处理连接
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置保持连接

ChannelFuture future = bootstrap.bind(PORT).sync(); // 绑定端口
future.channel().closeFuture().sync(); // 等待关闭
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully(); // 关闭线程组,先打开的后关闭
bossGroup.shutdownGracefully();
}
}

}
客户端代码和服务端代码类似,区别在于:服务端提供监听端口,客户端负责连接端口;服务端的辅助类是ServerBootstrap,而客户端的辅助类是Bootstrap(和ServerSocket,Socket关系很像)
客户端处理类
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
*
* 和ServerHandler类似
*
*/
public class ClientHandler extends ChannelHandlerAdapter{

public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body);
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg); // 释放msg
}
}

public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
cause.printStackTrace();
chc.close();
}

}
客户端启动服务类
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

private static final int PORT = 8888;
private static final String HOST = "127.0.0.1";

public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立连接
future.channel().writeAndFlush(Unpooled.copiedBuffer("快醒醒,还有几个bug没有改".getBytes())); // 向服务端发送数据
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

}
到这里,便是Netty的简单应用。打印的结果:
Server :快醒醒,还有几个bug没有改
Client :卒......

3 拆包粘包

3.1 使用特殊的分隔符
3.2 限定长度,不推荐。若发送数据的长度不够指定长度,则一直处于等待中。
代码在原来的基础上做了简单修改,可以打开注释自己调试
首选是服务器处理类

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* DISCARD协议 丢弃协议,其实就是只接收数据,不做任何处理。
* ECHO服务(响应式协议),其实就是返回数据。
* 实现步骤:
* step1 继承 ChannelHandlerAdapter
* step2 覆盖chanelRead()事件处理方法
* step3 释放ByteBuffer,ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
* step4 异常处理,即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
*/
public class DiscardServerHandler extends ChannelHandlerAdapter{

private static final String DELIMITER = "^_^"; // 拆包分隔符

@Override
public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
// 简单的读写操作
/*
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body);
chc.writeAndFlush(Unpooled.copiedBuffer("卒...... ".getBytes())); // 返回数据
*/
// 加了 StringDecoder 字符串解码器后可以直接读取
System.out.println("Server :" + msg);
// 分隔符拆包
//			String response = msg + " , 骗你的" + DELIMITER;
//			chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
// 限定长度拆包
chc.channel().writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes()));
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg); // 释放msg
}
}

@Override
public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
cause.printStackTrace();
chc.close();
}
}

然后是服务端Netty启动代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* step1 创建两个线程组分别负责接收和处理
* step2 启动NIO服务辅助类
* step3 绑定两个线程组,指定一个通道,关联一个处理类,设置一些相关参数
* step4 监听端口
* step5 关闭一些资源
*/
public class DiscardServer {

private static final int PORT = 8888; // 监听的端口号
private static final String DELIMITER = "^_^"; // 拆包分隔符

public static void main(String[] args) {
// NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理进来的连接
try {
ServerBootstrap bootstrap = new ServerBootstrap(); // ServerBootstrap 是一个启动NIO服务的辅助启动类
bootstrap.group(bossGroup, workerGroup) // 绑定俩个线程组
.channel(NioServerSocketChannel.class) // 指定用 NioServerSocketChannel 通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 考虑到tcp拆包粘包的问题,升级代码
// step1 获取特殊分隔符的ByteBuffer
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
// step2 设置特殊分隔符
//					socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));
// 还有一种就是指定长度   二选一 (用的比较少)
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
// step3 设置字符串形式的解码
socketChannel.pipeline().addLast(new StringDecoder());
// step4 设置处理类
socketChannel.pipeline().addLast(new DiscardServerHandler()); // DiscardServerHandler是我们自定义的服务器处理类,负责处理连接
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置保持连接

ChannelFuture future = bootstrap.bind(PORT).sync(); // 绑定端口
future.channel().closeFuture().sync(); // 等待关闭
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully(); // 关闭线程组,先打开的后关闭
bossGroup.shutdownGracefully();
}
}

}

客户端处理类

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
*
* 和ServerHandler类似
*
*/
public class ClientHandler extends ChannelHandlerAdapter{

public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
/*
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
*/
System.out.println("Client : " + msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg); // 释放msg
}
}

public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
cause.printStackTrace();
chc.close();
}

}

客户端启动服务类

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Client {

private static final int PORT = 8888;
private static final String HOST = "127.0.0.1";
private static final String DELIMITER = "^_^"; // 拆包分隔符

public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 考虑到tcp拆包粘包的问题,升级代码
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
//					socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
socketChannel.pipeline().addLast(new StringDecoder());

socketChannel.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立连接
//			future.channel().writeAndFlush(Unpooled.copiedBuffer("快醒醒,还有几个bug没有改".getBytes())); // 向服务端发送数据
//			future.channel().writeAndFlush(Unpooled.copiedBuffer(("又要加班了"+DELIMITER).getBytes()));
//			future.channel().writeAndFlush(Unpooled.copiedBuffer(("好开心啊T。T"+DELIMITER).getBytes()));
future.channel().writeAndFlush(Unpooled.copiedBuffer("123456789".getBytes())); // 传的个数是9个,只打印了5个,还有4个在等待中
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

}


4 优质博客

Netty 5用户指南

以上便是Netty5快速入门的内容,如果你觉得不错,可以点个赞哦





 ,下一章介绍Netty5的编解码知识。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: