nio的基础和Reactor的理解
2017-01-17 13:56
99 查看
nio的一些基础,还是感觉有些模糊,特别reactor
下面是我学习的一些代码,方便没事回顾一下:
Reactor模式
就是将消息放到了一个队列中,通过异步线程池对其进行消费!
Reactor:Reactor是IO事件的派发者。
Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler。
Handler:和一个client通讯的实体,按这样的过程实现业务的处理。一般在基本的Handler基础上还会有更进一步的层次划分, 用来抽象诸如decode,process和encoder这些过程。比如对Web Server而言,decode通常是HTTP请求的解析, process的过程会进一步涉及到Listener和Servlet的调用。业务逻辑的处理在Reactor模式里被分散的IO事件所打破,
所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续中断的处理。为了简化设计,Handler通常被设计成状态机,按GoF的state pattern来实现。
个人理解:原来的selector,nio服务端是c/s模式,所有的接受和注册都是selector管理,不过后面的io或者业务处理,只用一个线程。
reactor,是让每个channel接受过来后,让对应的handler处理,并且每一个io操作都是放入线程池中处理,注意其中的attench方法,将对应的handler与channel绑定
对于信息的上下文保存:结合了解netty的handler来理解觉得是这样的:
第一:handler中处理的都是工作线程,如果如要io操作的就放入线程池去处理,线程池处理是异步的,所以要有全局的上下文,来保存异步的结果,比如ctx的attribute或者channel的上下文,channel的作用范围要大一些,作用整个请求开始到结束,异步结果nenty中的是channelfuture,对于gof的实现不了解,之后再理解一下。
参考:
http://www.cnblogs.com/ivaneye/p/5731432.html http://www.iteye.com/magazines/132-Java-NIO
学习代码:
下面是我学习的一些代码,方便没事回顾一下:
public class MyTry { public static void main(String[] args) { try { RandomAccessFile afile = new RandomAccessFile("aa.text", "rw"); FileChannel inchannel = afile.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(48); int read = inchannel.read(buffer); while (read != -1) { /* buffer.mark(); buffer.reset();*/ buffer.flip(); while (buffer.hasRemaining()) { System.out.println((char) buffer.get()); } buffer.clear(); read = inchannel.read(buffer); } //scatter /*ByteBuffer header = ByteBuffer.allocate(128); ByteBuffer body = ByteBuffer.allocate(1024); ByteBuffer[] bufferArray = { header, body }; inchannel.read(bufferArray);*/ //gather ByteBuffer header = ByteBuffer.allocate(128); ByteBuffer body = ByteBuffer.allocate(1024); //write data into buffers ByteBuffer[] bufferArray = {header, body}; inchannel.write(bufferArray); //transform RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw"); FileChannel fromChannel = fromFile.getChannel(); RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw"); FileChannel toChannel = toFile.getChannel(); long position = 0; long count = fromChannel.size(); toChannel.transferFrom(fromChannel, position, count); //toChannel.transferFrom(position, count, fromChannel); afile.close(); //selector Selector selector = Selector.open(); Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key =(SelectionKey) keyIterator.next(); if (key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } } /** * block * 通过 ServerSocketChannel.accept() 方法监听新进来的连接。 * 当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。 * 因此,accept()方法会一直阻塞到有新连接到达。 通常不会仅仅只监听一个连接,在while循环中调用 accept()方法. 如下面的例子: */ //socket /*ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); */ /*while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }*/ /** * ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下, * accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 * 因此,需要检查返回的SocketChannel是否是null。 */ /*serverSocketChannel.configureBlocking(false); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); if(socketChannel != null){ //do something with socketChannel... } }*/ /** * Datagram 通道 * Java NIO中的DatagramChannel是一个能收发UDP包的通道。因为UDP是无连接的网络协议, * 所以不能像其它通道那样读取和写入。它发送和接收的是数据包。 */ /* DatagramChannel channel=DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999)); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); channel.receive(buf); //发送数据 //通过send()方法从DatagramChannel发送数据 String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer bf = ByteBuffer.allocate(48); bf.clear(); bf.put(newData.getBytes()); //这个地方用flip的理解,首先我们写入之后,应该就是读取才会flip,为什么send的时候也要flip //因为从buffer中要读入到channel中所以要flip,相对而言nenty的buf就不用 bf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80)); //cha ab5d nnel读写 /** * 这个例子发送一串字符到”jenkov.com”服务器的UDP端口80。 因为服务端并没有监控这个端口, * 所以什么也不会发生。也不会通知你发出的数据包是否已收到,因为UDP在数据传送方面没有任何保证。 * 连接到特定的地址 * * 可以将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的, 连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel , 让其只能从特定地址收发数据。 */ //channel.connect(new InetSocketAddress("jenkov.com", 80)); //当连接后,也可以使用read()和write()方法,就像在用传统的通道一样。只是在数据传送方面没有任何保证。这里有几个例子 /*int bytesRead = channel.read(buf); int bytesWritten = channel.write(but);*//* */ //向管道写数据 //要向管道写数据,需要访问sink通道。像这样: Pipe pipe = Pipe.open(); Pipe.SinkChannel sinkChannel = pipe.sink(); String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { sinkChannel.write(buf); } //从管道读取数据 从读取管道的数据,需要访问source通道,像这样 Pipe.SourceChannel sourceChannel = pipe.source(); ByteBuffer buff = ByteBuffer.allocate(48); //read()方法返回的int值会告诉我们多少字节被读进了缓冲区。 int bytesRead = sourceChannel.read(buff); /** * 神总结 * 全文比较长,想打个比方归纳一下。 原文中说了最重要的3个概念, Channel 通道 Buffer 缓冲区 Selector 选择器 其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西。 以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候, 不管水到了没有,你就都只能耗在接水(流)上。 nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候, 各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector, 他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试 着打开另一个水龙头(看看有没有水)。 当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。 也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们。 这其实也是非常接近当前社会分工细化的现实,也是统分利用现有资源达到并发效果的一种很经济的手段, 而不是动不动就来个并行处理,虽然那样是最简单的,但也是最浪费资源的方式 */ }catch (IOException e){ e.printStackTrace(); } //基本的channel实例 } }
Reactor模式
就是将消息放到了一个队列中,通过异步线程池对其进行消费!
Reactor:Reactor是IO事件的派发者。
Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler。
Handler:和一个client通讯的实体,按这样的过程实现业务的处理。一般在基本的Handler基础上还会有更进一步的层次划分, 用来抽象诸如decode,process和encoder这些过程。比如对Web Server而言,decode通常是HTTP请求的解析, process的过程会进一步涉及到Listener和Servlet的调用。业务逻辑的处理在Reactor模式里被分散的IO事件所打破,
所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续中断的处理。为了简化设计,Handler通常被设计成状态机,按GoF的state pattern来实现。
个人理解:原来的selector,nio服务端是c/s模式,所有的接受和注册都是selector管理,不过后面的io或者业务处理,只用一个线程。
reactor,是让每个channel接受过来后,让对应的handler处理,并且每一个io操作都是放入线程池中处理,注意其中的attench方法,将对应的handler与channel绑定
对于信息的上下文保存:结合了解netty的handler来理解觉得是这样的:
第一:handler中处理的都是工作线程,如果如要io操作的就放入线程池去处理,线程池处理是异步的,所以要有全局的上下文,来保存异步的结果,比如ctx的attribute或者channel的上下文,channel的作用范围要大一些,作用整个请求开始到结束,异步结果nenty中的是channelfuture,对于gof的实现不了解,之后再理解一下。
参考:
http://www.cnblogs.com/ivaneye/p/5731432.html http://www.iteye.com/magazines/132-Java-NIO
学习代码:
/** * Created by yoyohunter on 17/1/16. */ public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port)throws IOException { selector=Selector.open(); serverSocket= ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port); serverSocket.socket().bind(address); serverSocket.configureBlocking(false); //想selector注册 这个channel SelectionKey sk=serverSocket.register(selector, SelectionKey.OP_ACCEPT); System.out.println("start serversocket register"); //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor sk.attach(new Acceptor()); System.out.println("new acceptor"); } public void run() { try{ while (!Thread.interrupted()){ selector.select(); Set selected=selector.selectedKeys(); Iterator it=selected.iterator(); //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 while (it.hasNext()) //来一个事件 第一次触发一个accepter线程 //以后触发SocketReadHandler dispatch((SelectionKey)(it.next())); selected.clear(); } }catch (IOException e){ e.printStackTrace(); } } //运行Acceptor或SocketReadHandler void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); if (r != null){ // r.run(); } } class Acceptor implements Runnable{ public void run() { try{ SocketChannel c=serverSocket.accept(); if(c !=null){ //调用hanndler处理channel new SocketReadHandler(selector,c); } }catch (IOException e){ e.printStackTrace(); } } } }
相关文章推荐
- 我所理解的IO基础:IO、NIO、AIO的一些基本概念
- java nio&netty系列之二reactor模型基础
- NIO(一)基础理解
- [C#基础]理解方法的参数传递
- 网络基础知识讲座之三:理解OSI网络分层
- 网络基础知识讲座之十:TCP协议理解进阶
- 网络基础知识讲座之八:初步理解IP协议
- 网络基础知识讲座之二:理解子网和CIDR
- 巧妙理解电气基础理论
- 基础:理解 数组与集合(C#,泛型 )
- java 基础试题及答案(自己的理解)
- Netty2: 事件驱动的NIO框架(基础知识)
- JAVA基础知识: 对synchronized(this)的一些理解
- 理解S3C2440里面*.S文件的一些基础
- 网络基础知识讲座之四理解数据链路层
- 理解 Zend 框架 基础篇
- 理性应建立在理解别人的感性基础上
- 网络基础知识讲座之一:理解IPv4地址的含义
- Reactor模式和NIO
- SQL Server 索引基础知识(5)----理解newid()和newsequentialid()