您的位置:首页 > 运维架构 > 网站架构

【JAVA开发之架构专题】10.NIO通信架构

2017-10-10 17:00 274 查看

1. NIO概念

jdk 1.4出的
非阻塞
(nonblocking) IO

基于
通道
缓冲区
操作

NIO模式图



2. NIO核心组成部分

Channel:高速通道

ServerSocketChannel
: 静态工厂方法
open
创建实例,
ServerSocketChannel
封装的
ServerSocket
还是
blocking IO
模式。
configureBlocking(false)
时,多路注册器
Selector
可用。

Selector:多路注册器。自身静态工厂方法
open
创建实例。通过
SelectorKey.OP_ACCEPT
注册
Selector
。(线程在
Selector
轮询拿四大事件的
SelectorKey
集合
set< SelectorKey >
)。

SelectorKey
里面包含哪些信息呢?有
Channel
Selector


3. 代码体验

NIOServer.java和NIOClient.java

1. NIOServer

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
* NIO服务端
* @author whbing
*
*/
public class NIOServer {

//缓冲区,对应Channel的通信方式
private ServerSocketChannel server;
int port = 8080;//默认值,后边的构造器可以改值
//多路注册复用器,注册SelectKey_OP各种操作事件
private Selector selector;
//接收缓冲池和发送缓冲池
ByteBuffer recBuffer  = ByteBuffer.allocate(1024);
ByteBuffer sendBuffer  = ByteBuffer.allocate(1024);
//缓存机制
Map<SelectionKey, String> essiomMsg = new HashMap<SelectionKey,String>();
//对客户端编号
private static int client_no = 19056;

//构造器
public NIOServer(int port) throws IOException{
this.port = port;//如果没有传port,就用上边默认的port
server= ServerSocketChannel.open();
//底层就是一个ServerSocket
server.socket().bind(new InetSocketAddress(this.port));
server.configureBlocking(false);
//再将blocking设为false后,开启selector
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO消息服务器初始化完成,可以随时接收客户端链接,监听端口为"+this.port);

}
//我们需要用一个线程去监听selector,看上边是否有满足我们需要的事件类型SelectionKey
private void listener() throws IOException{
while(true){
int evenCount = selector.select();//没有找到selector则为0
if(evenCount == 0){
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
//遍历并处理监听到selector中的事件
final Iterator<SelectionKey> iteratorKeys = keys.iterator();
while (iteratorKeys.hasNext()) {
process(iteratorKeys.next());
iteratorKeys.remove();

}
}
}
//这里就是用来处理每一个SelectionKey:包含通道Channel信息 和 selector信息
private void process(SelectionKey key) {
SocketChannel client = null;
try {
if(key.isValid() && key.isAcceptable()){
client=server.accept();
++client_no;
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}else if(key.isValid() && key.isReadable()){
//服务器从SocketChannel读取客户端发送过来的信息
recBuffer.clear();
client= (SocketChannel)key.channel();
int len = client.read(recBuffer);
if(len>0){
String msg=new String(recBuffer.array(), 0, len);
essiomMsg.put(key, msg);
System.out.println("当前维护的线程ID:"+ Thread.currentThread().getId()
+"对客户端写信息,客户端编号为:"+client_no+"信息为:"+msg);

//改变状态,又会被监听器监听到
client.register(selector, SelectionKey.OP_WRITE);
}
}else if(key.isValid() && key.isWritable()){
if(!essiomMsg.containsKey(key)){
return;
}
client = (SocketChannel)key.channel();
//position=0
sendBuffer.clear();
//如position=500
sendBuffer.put((essiomMsg.get(key)+"你好,已经处理完请求!").getBytes());
//limit=500 position=0 0-->limt
sendBuffer.flip();
client.write(sendBuffer);
System.out.println("当前维护的线程ID:"+ Thread.currentThread().getId()
+"对客户端写信息,客户端编号为:"+client_no);
//改变状态,又会被监听器监听到
client.register(selector, SelectionKey.OP_READ);
}
} catch (IOException e) {
//防止客户端非法下线
key.cancel();
try {
client.socket().close();
client.close();
System.out.println("【系统提示】"+new SimpleDateFormat().format(new Date())+
essiomMsg.get(key)+"已下线");
} catch (IOException e1) {
e1.printStackTrace();
}
}
}

public static void main(String[] args) {
try {
new NIOServer(8080).listener();
} catch (IOException e) {
e.printStackTrace();
}
}
}


NIOclient

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
* NIO客户端
* @author whbing
*
*/
public class NIOClient {

private SocketChannel client;

InetSocketAddress serverAddress = new InetSocketAddress("localhost", 8080);

private Selector selector;

//接收缓冲池和发送缓冲池
ByteBuffer recBuffer  = ByteBuffer.allocate(1024);
ByteBuffer sendBuffer  = ByteBuffer.allocate(1024);

//构造器
public NIOClient() throws IOException{
//构造client实例
client= SocketChannel.open();

client.configureBlocking(false);
client.connect(serverAddress);

//构造selector实例
selector = Selector.open();

//注册连接事件
client.register(selector, SelectionKey.OP_CONNECT);
//Netty Reactor线程池组 Tomcat bootstrap
}

private void session() throws IOException{

if(client.isConnectionPending()){
client.finishConnect();

client.register(selector, SelectionKey.OP_WRITE);

System.out.println("已经连接到服务器,可以在控制台登记了");

}

Scanner sc = new Scanner(System.in);

while(sc.hasNextLine()){
String msg = sc.nextLine();
if("".equals(msg)){
continue;
}
if("exit".equals(msg)){
System.exit(0);
}

process(msg);

}
}

private void process(String name) {
boolean waitHelp = true;
Iterator<SelectionKey> iteratorKeys = null;
Set<SelectionKey> keys =null;
while (waitHelp) {
try {

int readys = selector.select();

//如果没有客人,继续轮询
if(readys == 0){
continue;
}

keys = selector.selectedKeys();
iteratorKeys = keys.iterator();

//一个个迭代keys
while(iteratorKeys.hasNext()){
SelectionKey key = iteratorKeys.next();

if(key.isValid() && key.isWritable()){

//可写就是客户端要对服务器发送信息
sendBuffer.clear();
sendBuffer.put(name.getBytes());
sendBuffer.flip();
client.write(sendBuffer);
client.register(selector, SelectionKey.OP_READ);
}else if(key.isValid() && key.isReadable()){
//服务器发送信息回来,给客户端读
recBuffer.clear();
int len = client.read(recBuffer);
if(len>0){
recBuffer.flip();
System.out.println("服务器返回的消息是: 当前维护的线程ID:"+ Thread.currentThread().getId()
+"对客户端写信息"+ new String(recBuffer.array(), 0, len) );

//改变状态,又会被监听器监听到
client.register(selector, SelectionKey.OP_WRITE);

waitHelp = false;
}
}
//检查完之后,打发客户走
iteratorKeys.remove();
}

} catch (IOException e) {
//防止客户端非法下线
((SelectionKey)keys).cancel();
try {
client.socket().close();
client.close();
return;
} catch (IOException e1) {
e1.printStackTrace();
}
}

}

}

public static void main(String[] args) {
try {
new NIOClient().session();
} catch (IOException e) {
e.printStackTrace();
}
}
}


结果:

//client
已经连接到服务器,可以在控制台登记了
whbing上线了!
服务器返回的消息是: 当前维护的线程ID:1对客户端写信息whbing上线了!你好,已经处理完请求!
second!
服务器返回的消息是: 当前维护的线程ID:1对客户端写信息second!你好,已经处理完请求!

//server
NIO消息服务器初始化完成,可以随时接收客户端链接,监听端口为8080
当前维护的线程ID:1对客户端写信息,客户端编号为:19057信息为:whbing上线了!
当前维护的线程ID:1对客户端写信息,客户端编号为:19057
当前维护的线程ID:1对客户端写信息,客户端编号为:19057信息为:second!
当前维护的线程ID:1对客户端写信息,客户端编号为:19057
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java nio-socket