您的位置:首页 > 其它

Netty学习之NIO入门

2018-06-14 23:42 429 查看

前言

最近在公司里碰到了诸多服务端的问题,随着网站规模的不断扩大,系统并发访问量也越来越高,传统基于Tomcat等Web容器的垂直架构已经无法满足需求,需要拆分应用进行服务化,以提高开发和维护效率。从组网情况看,垂直的架构拆分后,系统采用分布式部署,各个节点之间需要远程服务调用,高性能的RPC框架不可少,Netty作为异步高性能的通信框架,往往作为基础通信组件被这些RPC框架使用。

Netty是什么

Netty是由JBOSS提供的一个开源框架。Netty是一个异步、事件驱动的网络应用框架,使用它可以快速开发出可维护的、高性能网络协议服务器。它大幅简化了网络编程,比如TCP和UDP套接字服务器开发。而且在保证快速和易用性的同事,使用Netty开发的应用程序还保持可维护性和性能。

本质:Jboss做的一个Jar包

目的:快速开发高性能、高可靠性的网络服务器和客户端程序

优点:提供异步的、事件驱动的网络应用程序框架和工具

Netty和Tomcat的区别

Netty和Tomcat最大的区别就在于通信协议。Tomcat是基于Http协议的,它的实质是一个基于http协议的web容器。Netty能通过编程自定义各种协议,因为Netty能够通过code自己来编码/解码字节流,完成类似Redis访问的功能,这就是Netty和Tomcat最大的不同。
有人说Netty的性能高于Tomcat,这个说法并不正确。Tomcat从6.x开始就支持了nio模式,并且还有arp(通过jni调用Apache网路库)模式,相比于旧的bio模式,并发性得到了很大的提高。

Netty 基础

说了这么多,对Netty大概有了一个简单的认识,Netty是基于NIO的网络通信框架。所以在学习Netty之前,必须要对NIO有一个清晰的认识

NIO入门

对于BIO、伪异步IO毋庸赘言。NIO是JDK1.4引入的,它在标准的Java代码中提供了高速的、面向块的IO。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO不用使用本地代码就可以利用低级优化。下面对NIO的一些概念简单描述一下

缓冲区Buffer。Buffer是一个对象,它包含了一些要写入或者要读出的数据。在NIO库中,所有数据都是用缓冲区处理。缓冲区实质上是一个数组。通常它是一个字节数组,也可以使用其他种类的数组。但是缓冲区不仅仅是一个数组,缓冲区还提供了对数据的结构化访问以及维护读写位置等信息

通道Channel。网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,而流是单向的。实际上在Java中,Channel可以分为2类,用于网络读写的SelectableChannel和用于文件操作的FileChannel。

多路复用器Selector。它是Java NIO的基础。熟练掌握Selector对于NIO编程至关重要。简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel发生了读/写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel集合,进行后续的I/O操作。

NIO服务端时序简图如下:



代码解析该简图

package com.example.nio.server;

/**
* @version 1.0
* @Desription:
* @Author:Hui
* @CreateDate:2018/6/12 23:01
*/
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
if(null != args && args.length > 0) {
port = Integer.valueOf(args[0]);
}

MultipleTimeServer timeServer = new MultipleTimeServer(port);

new Thread(timeServer,"NIO-MultiplexeTimerServer-001").start();

}
}

package com.example.nio.server;

import org.apache.catalina.Server;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

/**
* @version 1.0
* @Desription:
* @Author:Hui
* @CreateDate:2018/6/12 23:03
*/
public class MultipleTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;

public MultipleTimeServer(int port) {
try {
//1、打开ServerSocketChannel,用于监听客户端连接
serverChannel = ServerSocketChannel.open();
//2、绑定端口监听、设置连接为非阻塞模式
serverChannel.bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
serverChannel.configureBlocking(false);
//3、创建Reactor线程,创建多路复用器并启动线程
selector = Selector.open();
//4、将ServerSocketChannel注册到selector上,监听ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The Time Server is start in port : "+ port);

} catch (Exception e) {
e.printStackTrace();
}

}

public void stop(){
this.stop = true;
}

@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if(key != null) {
key.cancel();
if(key.channel() != null)
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
try {
if(selector != null) {
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()) {
//处理新的请求消息
if(key.isAcceptable()) {
// Accept new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
}
if(key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0 ) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The Time server receive order : "+ body);

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes < 0) {
key.cancel();
sc.close();
}else
;
}
}
}

private void doWrite(SocketChannel channel, String response) throws IOException {
if(StringUtils.hasText(response)) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}

PS:这个类稍显复杂了一些

NIO客户端代码

package com.example.nio.client;

/**
* @version 1.0
* @Desription:
* @Author:Hui
* @CreateDate:2018/6/14 22:54
*/
public class TimeClient {

public static void main(String[] args) {
int port = 8080;
if(null != args && args.length > 0){
port = Integer.valueOf(args[0]);
}

new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
}
}

package com.example.nio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @version 1.0
* @Desription:
* @Author:Hui
* @CreateDate:2018/6/14 22:55
*/
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile  boolean stop;

public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1":host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if(key != null) {
key.cancel();
if(key.channel() != null)
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if(sc.finishConnect()) {
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else
System.exit(1);
if(key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("Now is :" + body);
this.stop = true;
} else if(readBytes < 0 ){
key.cancel();
sc.close();
}
}else
;
}
}

private void doConnect() throws IOException {
if(socketChannel.connect(new InetSocketAddress(host,port))) {
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}else
socketChannel.register(selector,SelectionKey.OP_CONNECT);

}

private void doWrite(SocketChannel socketChannel) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if(!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server successed");
}
}
}

通过写代码发现NIO代码太复杂了。但是NIO编程的优点如下:

客户端发起连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像BIO客户端那样被同步阻塞。

SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,这样直接返回。

县城模型的优化:由于JDK的selector在Linux等主流操作系统上是通过epoll实现,它没有连接句柄数的限制,这意味着一个Selector线程可以接入成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。因此,它非常适合做高性能、高负载的网络服务器。

4中IO的对比

对比项同步阻塞IO(BIO)伪异步IO非阻塞IO异步IO(AIO)
客户端线程个数1:1M:NM:1(1个I/O线程处理多个客户端连接)M:0(不需要启动额外的IO线程,被动回调)
I/O类型(阻塞)阻塞I/O阻塞IO非阻塞IO非阻塞IO
I/O类型(同步)同步I/O同步I/O同步I/O(I/O多路复用)异步I/O
API使用难度简单简单非常复杂复杂
调试难度简单简单复杂复杂
可靠性非常差
吞吐量

选择Netty的理由

不直接使用NIO的原因

刚才通过编写NIO的代码也发现了,直接使用JDK原生的API很繁琐。所以不建议直接使用。

NIO的类库和API繁杂,使用麻烦,你需要数量掌握selector、ServerSocketChannel、socketChannel、ByteBuffer等

需要具备其他的额外技能作铺垫,例如Java多线程编程。这是因为NIO涉及到Reactor模式

可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存等问题,NIO的功能特点是功能开发相对容易,但是可靠性补齐的工作量太大

JDK NIO的BUG。例如epoll bug,它会导致Selector空轮询,最终导致CPU 100%。

为什么选择Netty

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,例如Hadoop的RPC框架Avor就是用了Netty作为底层的通信框架。通过对Netty的分析,我们将它的优点总结如下:

API使用简单,开发门槛低

功能强大,预置乐乐多种编解码功能,支持多种主流协议

定制能力强,可以通过ChannelHandler对通信框架进行灵活扩展

性能高

成熟、稳定

社区活跃,版本迭代周期短

经历了大规模的商业应用考研,质量得到验证。

总结

现在总算是对NIO有了一个大概的了解,name下一章将正式进入netty的世界,共同学习。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: