您的位置:首页 > 其它

Netty入门学习

2017-08-23 09:37 330 查看
部分说明引用地址Netty用户指南

前言

问题

现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用。

然而,有时候一个通用的协议和他的实现并没有覆盖一些场景。比如我们无法使用一个通用的HTTP服务器来处理大文件、电子邮件、近实时消息比如财务信息和多人游戏数据。我们需要一个合适的协议来处理一些特殊的场景。例如你可以实现一个优化的Ajax的聊天应用、媒体流传输或者是大文件传输的HTTP服务器,你甚至可以自己设计和实现一个新的协议来准确地实现你的需求。

另外不可避免的事情是你不得不处理这些私有协议来确保和原有系统的互通。这个例子将会展示如何快速实现一个不影响应用程序稳定性和性能的协议。

解决方案

Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。

“快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如FTP、SMTP、HTTP、许多二进制和基于文本的传统协议,Netty在不降低开发效率、性能、稳定性、灵活性情况下,成功地找到了解决方案。

有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是Netty和它们的不同之处。答案就是Netty的哲学设计理念。Netty从第一天开始就为用户提供了用户体验最好的API以及实现设计。正是因为Netty的设计理念,才让我们得以轻松地阅读本指南并使用Netty。

环境准备

请到Netty官网下载Netty4版本以上的jar包和Oracle官网下载JDK1.6以上JDK版本,本文是在windows操作系统下测试。

注:这里也只是为了试验而试验,代码也不会做到很全面,考虑或说明不周,后续再补。

我们先定义一个Server类,用作服务端

package main.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* Created by CX on 2017/8/22.
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();//1
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();//2
b.group(boss,work)
.channel(NioServerSocketChannel.class)//3
.option(ChannelOption.SO_BACKLOG,128)//4
.childOption(ChannelOption.SO_KEEPALIVE,true)//4
.childOption(ChannelOption.SO_RCVBUF,32*1024)
.childOption(ChannelOption.SO_SNDBUF,8*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
// sc.pipeline().addLast(Chan)
sc.pipeline().addLast(new ServerHandle());
}
});//5
ChannelFuture cf = b.bind(8765).sync();//6
cf.channel().closeFuture().sync();
work.shutdownGracefully();//7
boss.shutdownGracefully();
}
}

class ServerHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//8
System.out.println("通道激活...");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//9
System.out.println("获取数据...");
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String req = new String(request,"utf-8").toString();
System.out.println("收到消息:" + req);
ChannelFuture cf = ctx.channel().writeAndFlush(Unpooled.copiedBuffer(String.format("我是服务端,我收到你的消息[%S],准许建立请求...",req).getBytes()));
cf.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//10
System.out.println("读写完毕...");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//11
System.out.println("异常关闭...");
ctx.close();
}
}


NioEventLoopGroup

是用来处理I/O操作的多线程事件循环器,Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。在这个例子中我们实现了一个服务端的应用,因此会有2个NioEventLoopGroup会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,并且可以通过构造函数来配置他们的关系。

ServerBootstrap

是一个启动NIO服务的辅助启动类。你可以在这个服务中直接使用Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。

Channel

这里我们指定使用NioServerSocketChannel类来举例说明一个新的Channel如何接收进来的连接。

Option 及 ChannelOption

你可以设置这里指定的通道实现的配置参数。我们正在写一个TCP/IP的服务端,因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。

option()是提供给NioServerSocketChannel用来接收进来的连接。childOption()是提供给由父管道ServerChannel接收到的连接,在这个例子中也是NioServerSocketChannel。

Handle

这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel。也许你想通过增加一些处理类比如DiscardServerHandle来配置一个新的Channel或者其对应的ChannelPipeline来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,然后提取这些匿名类到最顶层的类上。

Bind

绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的8080端口。当然现在你可以多次调用bind()方法(基于不同绑定地址)。

关闭服务

关闭一个Netty应用往往只需要简单地通过shutdownGracefully()方法来关闭你构建的所有的NioEventLoopGroupS.当EventLoopGroup被完全地终止,并且对应的所有channels都已经被关闭时,Netty会返回一个Future对象

channelActive

方法将会在连接被建立并且准备进行通信时被调用。

channelRead

当你想获取客户端数据时,可以在这里操作。

channelReadComplete

如果数据处理完毕你想对数据做二次处理,这里是个很好的选择。

exceptionCaught

channel中的异常会在这里抛出,如果你想友好的给客户端响应,可以考虑这里。

致辞,服务端已经编写完毕,我们还需要一个客户端来向服务端发消息。

这里的ServerHandle类和下边的ClientHandle都继承自ChannelInboundHandlerAdapter

来获取Channel建立时的信息以及触发各种各样的I/O事件和操作,更多使用请重写父类下的其他方法

再定义一个Client和Server通讯

当然你也可以在CMD下用telnet直接访问Server

亦或者直接在浏览器上访问,只是这样你就没法看到返回信息

package main.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;

/**
* Created by CX on 2017/8/22.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandle());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.copiedBuffer(("我是客户端请求,收到请回答...").getBytes()));
cf.channel().closeFuture().sync();
work.shutdownGracefully();
}
}
class ClientHandle extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf b = (ByteBuf) msg;
try {

byte[] response  = new byte[b.readableBytes()];
b.readBytes(response);
System.out.println("响应信息:" + new String(response,"utf-8").toString());
} finally {
//2种释放方式
//b.release();
ReferenceCountUtil.release(msg);
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}


先启动服务端,再启动客户端,运行结果:

服务端:

通道激活...
获取数据...
收到消息:我是客户端请求,收到请回答...
读写完毕...


客户端:

响应信息:我是服务端,我收到你的消息[我是客户端请求,收到请回答...],准许建立请求...


一个小例子就完成了。

当然,还有许多细节需要考虑,比如:

将客户端此处代码

cf.channel().writeAndFlush(Unpooled.copiedBuffer(("我是客户端请求,收到请回答...").getBytes()));


替换为

for (int i =0 ;i<10;i++){
String req = String.format("客户端第%d请求,收到请回答...",i);
byte[] bytes = req.getBytes();
System.out.println(bytes.length);
cf.channel().writeAndFlush(Unpooled.copiedBuffer(bytes));
}


将客户端发送数据改为循环向服务端发送10次请求,看下执行情况:

客户端

40
40
40
40
40
40
40
40
40
40
响应信息:我是服务端,我收到你的消息[客户端第0请求,收到请回答...客户端第1请求,收到请回答...客户端第2请求,收到请回答...客户端第3请求,收到请回答...客户端第4请求,收到请回答...客户端第5请求,收到请回答...客户端第6请求,收到请回答...客户端第7请求,收到请回答...客户端第8请求,收到请回答...客户端第9请求,收到请回答...],准许建立请求...


服务端

通道激活...
获取数据...
收到消息:客户端第0请求,收到请回答...客户端第1请求,收到请回答...客户端第2请求,收到请回答...客户端第3请求,收到请回答...客户端第4请求,收到请回答...客户端第5请求,收到请回答...客户端第6请求,收到请回答...客户端第7请求,收到请回答...客户端第8请求,收到请回答...客户端第9请求,收到请回答...
读写完毕...


向服务器发送了10次请求,然后打印的结果却是只有一句,只执行了一次读操作。

造成这样的线下是因为,数据传输是以流的形式传输,客户端循环打印看起来是发了10次,其实还是一个流,这种现象叫粘包百度TCP粘包),出现这种问题也就没有做到想要的结果。

流数据的传输处理

一个小的Socket Buffer问题

在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的TCP/TP协议栈已经接收了3个数据包:



由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。



因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:



为了解决这种问题,我们有多种实现方式

第一种:在Server端添加decode来解码

class ServerDecoder extends ByteToMessageDecoder{

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes() <40 ){
return ;
}
list.add(byteBuf.readBytes(40));
}
}


这里的40指的是传入数据的长度为40,上边已打印出长度。

更改后的Server

package main.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
* Created by CX on 2017/8/22.
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();//1
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();//2
b.group(boss,work)
.channel(NioServerSocketChannel.class)//3
.option(ChannelOption.SO_BACKLOG,128)//4
.childOption(ChannelOption.SO_KEEPALIVE,true)//4
.childOption(ChannelOption.SO_RCVBUF,32*1024)
.childOption(ChannelOption.SO_SNDBUF,8*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
//加入解码器
sc.pipeline().addLast(new ServerDecoder(),new ServerHandle());
}
});//5
ChannelFuture cf = b.bind(8765).sync();//6
cf.channel().closeFuture().sync();
work.shutdownGracefully();//7
boss.shutdownGracefully();
}
}
class ServerDecoder extends ByteToMessageDecoder{ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if(byteBuf.readableBytes() <40 ){ return ; } list.add(byteBuf.readBytes(40)); } }
//新加
class ServerHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//8
System.out.println("通道激活...");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//9
System.out.println("获取数据...");
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String req = new String(request,"utf-8").toString();
System.out.println("收到消息:" + req);
byte[] response = String.format("我是服务端,我收到你的消息[%S],准许建立请求...",req).getBytes();
System.out.println(response.length);
ChannelFuture cf = ctx.channel().writeAndFlush(Unpooled.copiedBuffer(response));
cf.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//10
System.out.println("读写完毕...");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//11
System.out.println("异常关闭...");
ctx.close();
}
}


运行:

服务端:

通道激活...
获取数据...
收到消息:客户端第0请求,收到请回答...
获取数据...
收到消息:客户端第1请求,收到请回答...
获取数据...
收到消息:客户端第2请求,收到请回答...
获取数据...
收到消息:客户端第3请求,收到请回答...
获取数据...
收到消息:客户端第4请求,收到请回答...
获取数据...
收到消息:客户端第5请求,收到请回答...
获取数据...
收到消息:客户端第6请求,收到请回答...
获取数据...
收到消息:客户端第7请求,收到请回答...
获取数据...
收到消息:客户端第8请求,收到请回答...
获取数据...
收到消息:客户端第9请求,收到请回答...
读写完毕...


客户端:

40
40
40
40
40
40
40
40
40
40
响应信息:我是服务端,我收到你的消息[客户端第0请求,收到请回答...],准许建立请求...


已按照数据流长度进行拆分,这样做需要知道传入数据的长度。

但这也导致客户端只收到一句,后续拆分的没有发送出来。

第二种:按字符串拆分

也可以这样,添加字符拦截器的形式做数据处理:

new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
//解决粘包
//通过长度截取
//sc.pipeline().addLast(new ServerDecoder(),new ServerHandle());
//通过标识截取
ByteBuf delimiter = Unpooled.copiedBuffer("...".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter),new ServerHandle());
}
}


按照”…”做拆分数据流,运行如下:

服务端:

通道激活...
获取数据...
收到消息:客户端第0请求,收到请回答
获取数据...
收到消息:客户端第1请求,收到请回答
获取数据...
收到消息:客户端第2请求,收到请回答
获取数据...
收到消息:客户端第3请求,收到请回答
获取数据...
收到消息:客户端第4请求,收到请回答
获取数据...
收到消息:客户端第5请求,收到请回答
获取数据...
收到消息:客户端第6请求,收到请回答
获取数据...
收到消息:客户端第7请求,收到请回答
获取数据...
收到消息:客户端第8请求,收到请回答
获取数据...
收到消息:客户端第9请求,收到请回答
读写完毕...


客户端:

40
40
40
40
40
40
40
40
40
40
响应信息:我是服务端,我收到你的消息[客户端第0请求,收到请回答],准许建立请求...


发现这2种客户端返回只有一句,没有达到要求。

因为按照”…”做的拦截,这里的”…”都没有了。

第三种:定长处理器

new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
//解决粘包
//通过长度截取
//sc.pipeline().addLast(new ServerDecoder(),new ServerHandle());
//通过标识截取
//ByteBuf delimiter = Unpooled.copiedBuffer("...".getBytes());
//sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter),new ServerHandle());
//定长解析
sc.pipeline().addLast(new FixedLengthFrameDecoder(40),new ServerHandle());
}
}


运行结果:

通道激活...
获取数据...
收到消息:客户端第0请求,收到请回答...
获取数据...
收到消息:客户端第1请求,收到请回答...
获取数据...
收到消息:客户端第2请求,收到请回答...
获取数据...
收到消息:客户端第3请求,收到请回答...
获取数据...
收到消息:客户端第4请求,收到请回答...
获取数据...
收到消息:客户端第5请求,收到请回答...
获取数据...
收到消息:客户端第6请求,收到请回答...
获取数据...
收到消息:客户端第7请求,收到请回答...
获取数据...
收到消息:客户端第8请求,收到请回答...
获取数据...
收到消息:客户端第9请求,收到请回答...
读写完毕...


序列化解析对象

除了传输流数据外,netty可以通过增加handel的方式来直接传输对象。

下面是添加netty自带的StringDecode和StringEncode处理器,这样传输和接受的msg就可以强转成对象了。

请注意,传输的数据其实还是在ByteBuy里,只是增加了处理器才可以变为对象,因此加解码操作不可省略。

Server:

package main.netty.project;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* Created by CX on 2017/8/22.
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();//1
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();//2
b.group(boss,work)
.channel(NioServerSocketChannel.class)//3
.option(ChannelOption.SO_BACKLOG,128)//4
.childOption(ChannelOption.SO_KEEPALIVE,true)//4
.childOption(ChannelOption.SO_RCVBUF,32*1024)
.childOption(ChannelOption.SO_SNDBUF,8*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder(),new StringDecoder(),new ServerHandle());
}
});//5
ChannelFuture cf = b.bind(8765).sync();//6
cf.channel().closeFuture().sync();
work.shutdownGracefully();//7
boss.shutdownGracefully();
}
}

class ServerHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//8
System.out.println("通道激活...");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//9
System.out.println("获取数据...");
String m = (String) msg;
System.out.println(new String(m.getBytes(),"utf-8"));
ChannelFuture cf = ctx.channel().writeAndFlush(m);
cf.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//10
System.out.println("读写完毕...");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//11
System.out.println("异常关闭...");
ctx.close();
}
}


Client:

package main.netty.project;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* Created by CX on 2017/8/22.
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();//1
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();//2
b.group(boss,work)
.channel(NioServerSocketChannel.class)//3
.option(ChannelOption.SO_BACKLOG,128)//4
.childOption(ChannelOption.SO_KEEPALIVE,true)//4
.childOption(ChannelOption.SO_RCVBUF,32*1024)
.childOption(ChannelOption.SO_SNDBUF,8*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder(),new StringDecoder(),new ServerHandle());
}
});//5
ChannelFuture cf = b.bind(8765).sync();//6
cf.channel().closeFuture().sync();
work.shutdownGracefully();//7
boss.shutdownGracefully();
}
}

class ServerHandle extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//8
System.out.println("通道激活...");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//9
System.out.println("获取数据...");
String m = (String) msg;
ChannelFuture cf = ctx.channel().writeAndFlush(m);
cf.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//10
System.out.println("读写完毕...");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//11
System.out.println("异常关闭...");
ctx.close();
}
}


运行结果:

服务端:

通道激活...
获取数据...
客户端第1请求,收到请回答...
读写完毕...


客户端:

客户端第1请求,收到请回答...


这样,我们就可以直接操作对象了。

《未完待续》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: