您的位置:首页 > 编程语言 > Java开发

java I/O学习笔记3 ---NIO

2016-01-26 20:53 453 查看
NIO中有了几个新的概念:

通道:Channel,就像也是一种数据传输方式的抽象概念,与Stream不同之处在于,通道是双向的,stream是单向的(输入流,输出流),但是我们通常不直接向通道写数据而是通过缓冲区

缓冲区:buffer,实质上就是一个可变的数组对象,但是它提供了维护数据位置信息的方式(这个buffer里面的数据读到哪儿了?还剩多少数据没有读?等等..)

selector:负责轮询channel,就是把channel注册到selector上面,并选择你对这个channel变化感兴趣的方面(例如:channel是不是有新数据可以读了等等.),selector会去轮询注册在上满的channel,如果发现你注册的这个channel的感兴趣的方面发生变化的时候,他就会放到selected-key set 之中。(通过这个selected-key set可以确定哪些channel可以操作了)

SelectionKey:用于在Channel在注册selector之时,返回一个的token,这个token是一个对象,存储着这个被注册的Channel在selector上面的状态信息。

思路:

server端:

创建一个selector实例;

创建ServerSocketChannel实例,注册监听

轮询selector中的key

根据key的类型判断是新的连接请求还是某个已经有的Channel有新的可操作的内容

判断是否请求连接,如果是则在selector中注册新的Socket,并且设置selector轮询的内容(对这个socket的channel变化感兴趣的方面)

然后根据这个请求来判断是否需要其他处理(例如读写操作等。)

所以需要有一个专门管理socket的类,处理请求连接的方法,处理请求内容的类

client端:

新建一个SocketChannel实例,并且指向server

建立一个selector实例,

注册这个SocketChannel实例到这个Selector实例,并选择注册对这个Channel感兴趣的方面(interest set)

等待消息

server端代码:NIOServer.java 和NIOServerHandler.java

NIOServer.java

package learningNote.IO;

public class NIOServer {

public static void main(String[] args) {
int port=8090;

new Thread(new NIOServerHandler(port)).start();
}

}


NIOServerHandler.java

package learningNote.IO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServerHandler implements Runnable {

private Selector selector = null;
private ServerSocketChannel serverSocketChannnel = null;

public NIOServerHandler(int port) {
// TODO Auto-generated constructor stub
try {
selector = Selector.open();// 新建一个selector
serverSocketChannnel = ServerSocketChannel.open();
serverSocketChannnel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannnel.configureBlocking(false);
// register the selector
serverSocketChannnel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server start in port " + port);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
selector.select(6000);
System.out.println("开始select");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Set<SelectionKey> selectedkey = selector.selectedKeys();
Iterator<SelectionKey> it = selectedkey.iterator();

while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
selectedKeyHandler(key);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

private void addSocketChannel(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
// 将这个新的SocketChannel设置异步,并注册到selector
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);// 对读取新消息感兴趣
}

private void selectedKeyHandler(SelectionKey key) throws IOException {
// 判断key是否合法
if (key.isValid()) {
// 判断是否请求接入
if (key.isAcceptable()) {
/*
* Tests whether this key's channel is ready to accept a new
* socket connection.
* 根据这个方法说明,如果返回为true那么这个key对应的就是ServerSocketChannel
*/
System.out.println("new SocketChannel");
addSocketChannel(key);
}

if (key.isReadable()) {
/*
* Tests whether this key's channel is ready for reading.
* 根据这个方法的说明,如果返回true,则表示channel的数据已经准备好了,可以读取 了
*/
// 获取这个key的socketChannel,之后创建一个缓冲区,将这个channel的数据缓冲到这个缓冲区,并且打印出来
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int dataSize = sc.read(readBuffer);// 因为socket是异步的,所以他的read方法也是异步的,read不会造成阻塞
if (dataSize > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String request = new String(bytes, "UTF-8");
System.out.println("自client 输入的数据");
System.out.println(request);
writeData(sc, request);
}

}
}
}

private void writeData(SocketChannel s, String response) throws IOException {
byte[] bytes = (new java.util.Date().toString()+"\n" + response).getBytes(
bf4f
"UTF-8");
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
System.out.println("开始向Client输出数据");
System.out.println(new String(bytes,"UTF-8"));
s.write(writeBuffer);
}
}


Client端:NIOClient.java 和NIOClientHandler.java

NIOClient.java

package learningNote.IO;

public class NIOClient {

public static void main(String[] args) {
// TODO Auto-generated method stub
String host="127.0.0.1";
int port = 8090;
new Thread(new NIOClientHandler(host,port)).start();
}

}


NIOClientHandler.java

/**
*
*/
package learningNote.IO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @author jaon
*
*/
public class NIOClientHandler implements Runnable{

/**
*
*/
private SocketChannel sc;
private String host;
int port;
private Selector selector;
public NIOClientHandler(String host,int port) {
// TODO Auto-generated constructor stub
this.host=host;
this.port=port;
try {
sc=SocketChannel.open();
sc.configureBlocking(false);
selector =Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

@Override
public void run() {
// TODO Auto-generated method stub
try {
socketChannelConnect();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

while(true){
try {
selector.select(6000);
//获取被选择key,并且以此检查并操作每个key;
Set<SelectionKey> selectedKey=selector.selectedKeys();
Iterator<SelectionKey> it=selectedKey.iterator();
while(it.hasNext()){
SelectionKey key=it.next();
it.remove();
handlerKey(key);
}
} catch (IOException e) {

// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

private void handlerKey(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel socket=(SocketChannel) key.channel();
//判断socket是否处于连接成功,如果成功则需要注册
if(key.isConnectable()){

if(socket.finishConnect()){//表明连接已经建立
/*
* Finishes the process of connecting a socket channel.
* true if, and only if, this channel's socket is now connected
*/

socket.register(selector, SelectionKey.OP_READ);//注册这个channel,并且说明感兴趣的方面
}else{
System.exit(1);
}
}

//判断是否可读

if(key.isReadable()){
doRead(key);
doWrite(socket);
}
}
}

private void doRead(SelectionKey key) throws IOException{
System.out.println("从server输入的数据");
ByteBuffer readBuffer=ByteBuffer.allocate(1024);
SocketChannel sc=(SocketChannel) key.channel();
int readSize=sc.read(readBuffer);
readBuffer.flip();
if(readSize>0){
byte[] bytes=new byte[readBuffer.remaining()];
readBuffer.get(bytes);
System.out.println(new String(bytes,"UTF-8"));
}
}

private void socketChannelConnect() throws IOException{
//将这个sc连接到server,并且注册到监听器
sc.connect(new InetSocketAddress(host,port));
if(sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}
else
sc.register(selector, SelectionKey.OP_CONNECT);
}

private void doWrite(SocketChannel socket) throws IOException{
//byte[] bytes="this is a test".getBytes("UTF-8");

System.out.println("向server输出数据:回车后ok结束");
BufferedReader consoleIn=new BufferedReader(new InputStreamReader(System.in));
String sendMessage="";
while(true){
String temp=consoleIn.readLine();
if(temp.equals("ok"))break;
sendMessage+=temp+"\n";
}
byte[] sendBytes=sendMessage.getBytes("UTF-8");
ByteBuffer writeBuffer=ByteBuffer.allocate(sendBytes.length);
writeBuffer.put(sendBytes);
writeBuffer.flip();
socket.write(writeBuffer);
}

}


很关键的一个操作是在selector的注册上面

容易出错的操作是对key实例函数调用的理解
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java io