您的位置:首页 > 其它

Netty(1):主要部件介绍 & Hello World 实例

2018-03-05 22:08 441 查看

Netty 简介

※以下系列笔记总结自《Netty in Action》(Norman Maurer),和 Netty 4.x 官方技术文档;
Netty 是一个高性能、高拓展性、非阻塞的 Java 异步网络通信框架,Netty IO 传输部分是基于 Java NIO 的,同时对 NIO 复杂的 API 进行了进一步的封装,使得编写高性能的 Java 异步网络通信程序更将简单方便;
Netty 简化了 NIO 框架,主要的组成部分如下:Channel:通道,即 NIO 的基本结构,代表了一个用于连接到实体如硬件设备、文件、网络套接字或程序组件,能够执行一个或多个不同的 I/O 操作(例如读或写)的开放连接。

Callback:回调,Netty 内部使用回调处理事件时。一旦这样的回调被触发,事件可以由接口 ChannelHandler 的实现来处理;

Future:Netty 提供了比 java.util.concurrent 中更加方便的 Future 实现,用于实现对于异步执行结果的获取,即 Future 作为异步执行结果的占位符;

EventHandler:Netty 将各种网络触发事件(如入站、出站等)封装为 Event,并提供对于 Event 处理的 Handler 接口,对于业务逻辑通过这些 Handler 接口实现,由此对 NIO 的 Selector 模型进行封装;

Netty 项目主页:http://netty.io/index.html Netty 4.x 技术文档:http://netty.io/wiki/user-guide-for-4.x.html

Netty 架构模型组件

核心部件简介

Netty 架构模型的核心组件包括以下:
Bootstrap / ServerBootstrap:引导器;

Channel:通道;

ChannelHandler:通道处理器

ChannelPipeline:通道管道

EventLoop:事件执行环;

ChannelFuture:通道执行结果占位符;

Bootstrap Netty 应用程序通过设置 bootstrap(引导)类的开始,该类提供了一个 用于应用程序网络层配置的容器;Channel底层网络传输 API 必须提供给应用 I/O操作的接口(如读、写、连接、绑定等),这些操作往往需要和 socket 相互进;Netty 中的接口 Channel 定义了与 socket 丰富交互的操作集(bind, close, config, connect, isActive, isOpen, isWritable, read, write 等),提供大量的 Channel 实现来专门使用,包括 AbstractChannel,AbstractNioByteChannel,AbstractNioChannel,EmbeddedChannel, LocalServerChannel,NioSocketChannel 等;ChannelHandlerChannelHandler 支持很多协议,并且提供用于数据处理的容器, ChannelHandler 由特定事件触发,ChannelHandler 可专用于几乎所有的动作(如对象序列化、反序列化、异常抛出处理);常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑。当需要提供响应时,也可以从 ChannelInboundHandler 冲刷数据;业务逻辑经常存活于一个或者多个 ChannelInboundHandler;ChannelPipelineChannelPipeline 是 ChannelHandler 链的容器,提供了相应的 API 用于管理沿着链入站、出站事件的流动;每个 Channel 都有自己的ChannelPipeline,当 Channel 创建时自动创建的;当实现了ChannelHandler 的抽象 ChannelInitializer。ChannelInitializer子类 通过 ServerBootstrap 进行注册。当它的方法 initChannel() 被调用时,这个对象将安装自定义的 ChannelHandler 集到 pipeline。当这个操作完成时,ChannelInitializer 子类则 从 ChannelPipeline 自动删除自身;EventLoopEventLoop 用于处理 Channel 的 I/O 操作。一个单一的 EventLoop通常会处理多个 Channel 事件。一个 EventLoopGroup 可以含有多于一个的 EventLoop 和 提供了一种迭代用于检索清单中的下一个;ChannelFutureNetty 所有的 I/O 操作都是异步。因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果,Netty 提供了接口 ChannelFuture,它的 addListener 方法注册了一个 ChannelFutureListener ,当操作完成时,可以被通知(不管成功与否);

Channel、Event 与 I/O

Netty是一个异步事件驱动的NIO框架,Netty的所有IO操作都是异步非阻塞的,实际上 Netty 是使用 Threads(多线程)处理 I/O 事件,但是如果 Threads 之间为同步关系,会十分影响性能,Netty 的设计保证程序处理事件不会有同步;
如下实例解释了不需要再 Channel 之间共享 ChannelEvent 的原因:



如图所示,一个 EventLoopGroup 具有一个或多个 EventLoop,一个 EventLoop 为它所服务的 Channel 的生命周期期间的一个线程;当创建一个 Channel,Netty 通过 一个单独的 EventLoop 实例来注册该 Channel 的通道的使用寿命。这就是为什么你的应用程序不需要同步 Netty 的 I/O操作,所有 Channel 的 I/O 始终用相同的线程来执行。

Bootstrapping 的作用

Bootstrapping(引导) 出现在Netty 配置程序的过程中,Bootstrapping在给服务器绑定指定窗口或者要连接客户端的时候会使用到,有以下两种类型:Bootstrap:用于客户端的引导器;

ServerBootstrap:用于服务端的引导器;

在一个典型的 TCP 的 CS 程序中,Bootstrap 和 ServerBootstrap 之间差异如下:
Bootstarp:连接远程主机和端口,有 1 个 EventLoopGroup ;

ServerBootstarp:绑定本地端口有,有 2 个EventLoopGroup;

一个 ServerBootstrap 可以认为有 2 个 Channel 集合,第一个 Channel 集合包含一个单例 ServerChannel,代表持有一个绑定了本地端口的 socket;第二个 Channel 集合包含所有创建的 Channel,处理服务器所接收到的客户端进来的连接。如下所示:



与 ServerChannel 相关 EventLoopGroup 分配一个 EventLoop 是 负责创建 Channels 用于传入的连接请求。一旦连接接受,第二个EventLoopGroup 分配一个 EventLoop 给它的 Channel;

ChannelHanlder 和 ChannelPipeline 的关系

ChannelPipeline 是 ChannelHandler 链的容器;
Netty 中有两个方向的数据流,即入站和出站方向:
入站(inbound):数据时从远程主机到用户应用程序,使用 ChannelInboundHandler 处理器进行处理;

出站(outbound):数据是从用户应用程序到远程主机,使用 ChannelOutboundHandler 处理器进行处理;

为了使数据从一端到达另一端,一个或多个 ChannelHandler 将以某种方式操作数据。即这些 ChannelHandler 会在程序的 bootstrapoping(引导)阶段被添加ChannelPipeline中,并且被添加的顺序将决定处理数据的顺序;



当应用程序数据写出到网络,触发入站事件;数据将从 pipeline 头部开始,传递到第一个 ChannelInboundHandler 进行处理,在这之后 该数据将被传递到链中的下一个 ChannelInboundHandler,最后当数据 到达 pipeline 的尾部,此时所有入站处理结束;
当数据从网络写入到应用程序,触发出站事件;数据从尾部流过 ChannelOutboundHandlers 的链,直到它到达头部。超过这点,出站数据将到达的网络传输,在这里显示为一个 socket;

在当前的链(chain)中,事件可以通过 ChanneHandlerContext 传递给下一个 handler。Netty 为此提供了适配器 ChannelInboundHandlerAdapter ,用来处理入站事件,可以简单地通过调用 ChannelHandlerContext 上的相应方法将事件传递给下一个 handler;

实际上,在 Netty 发送消息可以采用两种方式:直接写消息给 Channel 或者写入 ChannelHandlerContext 对象;这两者主要的区别是, 第一种方法会导致消息从 ChannelPipeline 的尾部开始写入,第二种方法会导致消息从 ChannelPipeline 下一个 ChannelHandler 开始;

ChannelHanlder 的类型

ChannelHandler 为处理不同的功能提供不同的超类, 此外 Netty 提供了一系列的适配器类用于开发,pipeline 中每个的 ChannelHandler 主要负责转发事件到链中的下一个处理器,这些适配器类会自动实现这些功能,只需要覆盖特定的方法即可;

常用到的适配器如下:ChannelHandlerAdapter:通用通道处理适配器;

ChannelInboundHandlerAdapter:入站通道处理适配器;

ChannelOutboundHandlerAdapter:出站通道处理适配器;

ChannelDuplexHandlerAdapter:双工通道处理适配器;

以上这些适配器都已经提供了默认的编码器,解码器,用于对 POJO 进行序列化/反序列化;在一些简单的入站处理逻辑场景中,如接收到解码后的消息并应用一些业务逻辑到这些数据,此时可以使用实现类 SimpleChannelInboundHandler ,并实现 channelRead0(ChannelHandlerContext,T) 成员方法即可,但是注意在在实现方法中尽可能地不要阻塞 I/O 线程,这样不利于高性能;

Hello World 实例

以下通过一个基本的 Echo Client/Server 来说明 Netty 的基本使用实例,以下示例使用 Netty 4.1.22 版本;
首先需要在项目中导入 Netty 依赖 io.netty:netty-all ,在 gradle 构建环境中如下: 
compile 'io.netty:netty-all:4.1.22.Final'
以下示例的基本逻辑:先启动客户端,然后建立一个连接并发送一个或多个消息发送到服务器,服务器接收到消息会返回给客户端一个呼应消息;

服务端实现服务器端 ChannelHandler 处理器,EchoServerHandler 
package echoSample.server;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable //标识该类的实例在 channel 之间是可以共享的
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
   //当每个信息如站都会被调用
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ByteBuf in = (ByteBuf) msg;
       <
174fa
span class="cm-variable">System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
       ctx.write(in);   //将所接收的信息返回给发送者(此时并没有刷新缓冲区)
   }
   //当处理器处理最后的 channelRead() 时会被调用
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       //刷新所所有信息到远程节点
       ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
   }
   //读写操作时的异常捕获和处理
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

实现服务端 ServerBootstarp 引导器,EchoServer 
package echoSample.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.*;
import java.net.InetSocketAddress;
public class EchoServer {
   private final int port;
   public EchoServer(int port) {
       this.port = port;
   }
   public void start() throws InterruptedException {
       EventLoopGroup workerGroup = new NioEventLoopGroup();     //创建循环任务组
       try{
           ServerBootstrap bootstrap = new ServerBootstrap();               //创建服务引导器
           bootstrap.group(workerGroup)                                     //向引导器添加任务组
                   .channel(NioServerSocketChannel.class)                   //使用指定的 NIO 传输 Channel
                   .localAddress(new InetSocketAddress((port)))             //设置 socket 地址端口
                   .childHandler(new ChannelInitializer<SocketChannel>() {  //添加逻辑处理器
                       @Override
                       protected void initChannel(SocketChannel ch) throws Exception {
                           ch.pipeline().addLast(new EchoServerHandler());
                       }
                   });
           ChannelFuture future = bootstrap.bind().sync();   //引导器绑定服务器,sync等待服务器关闭;
           System.out.println(EchoServer.class.getName() + " started and listen on " + future.channel().localAddress());
           future.channel().closeFuture().sync();            //当 ChannelFuture 获取结果时,关闭管道
       }finally {
           workerGroup.shutdownGracefully().sync();         //关闭任务组,释放所有资源
       }
   }
   public static void main(String[] args) throws InterruptedException {
       new EchoServer(23333).start();
   }
}


客户端
实现客户端 ChannelHandler 处理器,EchoClientHandler 
package echoSample.client;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable  //标识该类的实例在 channel 之间是可以共享的
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
   //服务器建立连接后被调用
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       //向接收者发送信息
       ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
   }
   //数据从服务器接收后被调用
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
       //接收并打印信息
       System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
   }
   //读写发生异常时调用
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

实现客户端 Bootstarp 引导器,EchoClient 
package echoSample.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.*;
import java.net.InetSocketAddress;
public class EchoClient {
   private final String host;
   private final int port;
   public EchoClient(String host, int port) {
       this.host = host;
       this.port = port;
   }
   public void start() throws InterruptedException {
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try{
           Bootstrap bootstrap = new Bootstrap();
           bootstrap.group(workerGroup)
                   .channel(NioSocketChannel.class)
                   .remoteAddress(new InetSocketAddress(host,port))
                   .handler(new ChannelInitializer<SocketChannel>() {
                       @Override
                       protected void initChannel(SocketChannel ch) throws Exception {
                           ch.pipeline().addLast(new EchoClientHandler());
                       }
                   });
           ChannelFuture future = bootstrap.connect().sync();
           future.channel().closeFuture().sync();
       }finally {
           workerGroup.shutdownGracefully();
       }
   }
   public static void main(String[] args) throws InterruptedException {
       new EchoClient("localhost",23333).start();
   }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: