您的位置:首页 > Web前端 > React

nio的基础和Reactor的理解

2017-01-17 13:56 99 查看
nio的一些基础,还是感觉有些模糊,特别reactor

下面是我学习的一些代码,方便没事回顾一下:

public class MyTry {
public static void main(String[] args) {
try {
RandomAccessFile afile = new RandomAccessFile("aa.text", "rw");
FileChannel inchannel = afile.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(48);
int read = inchannel.read(buffer);
while (read != -1) {
/* buffer.mark();
buffer.reset();*/
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println((char) buffer.get());
}
buffer.clear();
read = inchannel.read(buffer);

}

//scatter
/*ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

inchannel.read(bufferArray);*/

//gather
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
//write data into buffers
ByteBuffer[] bufferArray = {header, body};
inchannel.write(bufferArray);

//transform
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

toChannel.transferFrom(fromChannel, position, count);
//toChannel.transferFrom(position, count, fromChannel);
afile.close();

//selector

Selector selector = Selector.open();
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key =(SelectionKey) keyIterator.next();
if (key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
}

/**
* block
* 通过 ServerSocketChannel.accept() 方法监听新进来的连接。
* 当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。
* 因此,accept()方法会一直阻塞到有新连接到达。
通常不会仅仅只监听一个连接,在while循环中调用 accept()方法. 如下面的例子:
*/
//socket

/*ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
*/
/*while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();

//do something with socketChannel...
}*/

/**
* ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,
* accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。
* 因此,需要检查返回的SocketChannel是否是null。
*/

/*serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();

if(socketChannel != null){
//do something with socketChannel...
}
}*/

/**
* Datagram 通道
* Java NIO中的DatagramChannel是一个能收发UDP包的通道。因为UDP是无连接的网络协议,
* 所以不能像其它通道那样读取和写入。它发送和接收的是数据包。
*/
/*

DatagramChannel channel=DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9999));

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf);

//发送数据
//通过send()方法从DatagramChannel发送数据
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer bf = ByteBuffer.allocate(48);
bf.clear();
bf.put(newData.getBytes());
//这个地方用flip的理解,首先我们写入之后,应该就是读取才会flip,为什么send的时候也要flip
//因为从buffer中要读入到channel中所以要flip,相对而言nenty的buf就不用
bf.flip();

int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

//cha
ab5d
nnel读写
/**
* 这个例子发送一串字符到”jenkov.com”服务器的UDP端口80。 因为服务端并没有监控这个端口,
* 所以什么也不会发生。也不会通知你发出的数据包是否已收到,因为UDP在数据传送方面没有任何保证。
* 连接到特定的地址
*
*
可以将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,
连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,
让其只能从特定地址收发数据。
*/

//channel.connect(new InetSocketAddress("jenkov.com", 80));
//当连接后,也可以使用read()和write()方法,就像在用传统的通道一样。只是在数据传送方面没有任何保证。这里有几个例子

/*int bytesRead = channel.read(buf);
int bytesWritten = channel.write(but);*//*

*/

//向管道写数据
//要向管道写数据,需要访问sink通道。像这样:
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
sinkChannel.write(buf);
}

//从管道读取数据 从读取管道的数据,需要访问source通道,像这样
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buff = ByteBuffer.allocate(48);
//read()方法返回的int值会告诉我们多少字节被读进了缓冲区。
int bytesRead = sourceChannel.read(buff);

/**
* 神总结
* 全文比较长,想打个比方归纳一下。
原文中说了最重要的3个概念,

Channel 通道

Buffer 缓冲区

Selector 选择器

其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西。
以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,
不管水到了没有,你就都只能耗在接水(流)上。
nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,
各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,
他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试
着打开另一个水龙头(看看有没有水)。
当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。
也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们。
这其实也是非常接近当前社会分工细化的现实,也是统分利用现有资源达到并发效果的一种很经济的手段,
而不是动不动就来个并行处理,虽然那样是最简单的,但也是最浪费资源的方式
*/

}catch (IOException e){
e.printStackTrace();
}
//基本的channel实例
}
}

Reactor模式



就是将消息放到了一个队列中,通过异步线程池对其进行消费!

Reactor:Reactor是IO事件的派发者。
Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler。
Handler:和一个client通讯的实体,按这样的过程实现业务的处理。一般在基本的Handler基础上还会有更进一步的层次划分, 用来抽象诸如decode,process和encoder这些过程。比如对Web Server而言,decode通常是HTTP请求的解析, process的过程会进一步涉及到Listener和Servlet的调用。业务逻辑的处理在Reactor模式里被分散的IO事件所打破,
所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续中断的处理。为了简化设计,Handler通常被设计成状态机,按GoF的state pattern来实现。

个人理解:原来的selector,nio服务端是c/s模式,所有的接受和注册都是selector管理,不过后面的io或者业务处理,只用一个线程。

reactor,是让每个channel接受过来后,让对应的handler处理,并且每一个io操作都是放入线程池中处理,注意其中的attench方法,将对应的handler与channel绑定

对于信息的上下文保存:结合了解netty的handler来理解觉得是这样的:

第一:handler中处理的都是工作线程,如果如要io操作的就放入线程池去处理,线程池处理是异步的,所以要有全局的上下文,来保存异步的结果,比如ctx的attribute或者channel的上下文,channel的作用范围要大一些,作用整个请求开始到结束,异步结果nenty中的是channelfuture,对于gof的实现不了解,之后再理解一下。

参考:
http://www.cnblogs.com/ivaneye/p/5731432.html http://www.iteye.com/magazines/132-Java-NIO
学习代码:

/**
* Created by yoyohunter on 17/1/16.
*/
public class Reactor implements Runnable {
final Selector selector;

final ServerSocketChannel serverSocket;

Reactor(int port)throws IOException {
selector=Selector.open();
serverSocket= ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);
//想selector注册 这个channel
SelectionKey sk=serverSocket.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("start serversocket register");

//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
sk.attach(new Acceptor());

System.out.println("new acceptor");

}

public void run() {
try{
while (!Thread.interrupted()){
selector.select();
Set selected=selector.selectedKeys();
Iterator it=selected.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (it.hasNext())
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch((SelectionKey)(it.next()));
selected.clear();
}

}catch (IOException e){
e.printStackTrace();
}
}
//运行Acceptor或SocketReadHandler
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null){
// r.run();

}
}

class Acceptor implements Runnable{
public void run() {
try{
SocketChannel  c=serverSocket.accept();
if(c !=null){
//调用hanndler处理channel
new SocketReadHandler(selector,c);
}

}catch (IOException e){
e.printStackTrace();
}
}
}
}







                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: