Java之流
2016-07-07 22:25
501 查看
IO
流的概念:流是一个很形象的概念,当程序需要读取数据的时候,就会开启一个通向数据源的流,这个数据源可以使文件、内存、或是网络连接。
类似的,当程序需要写入数据的时候,就会开启一个通向目的地的流。
这个时候你就可以想象数据好像在这其中“流”动一样。
分类
按数据方向分:输入流和输出流输入流:
InputStream/Reader
输出流:
OutputStream/Writer
按数据类型分:字节流和字符流
字节流:
InputStream/OutputStream
字符流:
Reader/Writer
NIO
概述:
NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,
数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。
因此,单个线程可以监听多个数据通道。
IO是面向流的,NIO是面向缓冲区的。
IO面向流意味着每次从流中读一个或多个字节,
直至读取所有字节,它们没有被缓存在任何地方。
此外,它不能前后移动流中的数据。
如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
NIO的缓冲导向方法略有不同。
数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。
这就增加了处理过程中的灵活性。
但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。
而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
IO的各种流是阻塞的。
这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,
直到有一些数据被读取,或数据完全写入。
该线程在此期间不能再干任何事情了。
NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,
但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。
而不是保持线程阻塞,所以直至数据变的可以读取之前,
该线程可以继续做其他的事情。
非阻塞写也是如此。
一个线程请求写入一些数据到某通道,但不需要等待它完全写入,
这个线程同时可以去做别的事情。
线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,
所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
Channel
Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.
而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel的主要实现有:
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
分别可以对应文件IO、UDP和TCP(Server和Client)。
Buffer
NIO中的关键Buffer实现有:ByteBuffer, ShortBuffer,IntBuffer, LongBuffer,
DoubleBuffer, FloatBuffer, CharBuffer
分别对应基本数据类型:
byte, short, int, long, double, float, char
Selector
Selector运行单线程处理多个Channel,如果你的应用打开了多个通道,但每个连接的流量都很低,使用Selector就会很方便。
例如在一个聊天服务器中。要使用Selector, 得向Selector注册Channel,
然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。
一旦这个方法返回,线程就可以处理这些事件,
事件的例子有如新的连接进来、数据接收等。
例子
NIOServer.java
package com.lee.jdk.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO服务端 * @author Lee * */ public class NIOServer { // 通道管理器 private Selector selector; /** * 获得一个ServerSocket通道,并对该通道初始化 * * @param port * @throws IOException */ public void initServer(int port) throws IOException { // 获得一个ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 设置通道为非阻塞 serverChannel.configureBlocking(false); // 将该通道对应的ServerSocket绑定到port端口 serverChannel.socket().bind(new InetSocketAddress(port)); // 获得一个通道管理器 this.selector = Selector.open(); /* * 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后, * 当该事件到达时,selector.selector()会返回,如果该事件没到达selector.select()会一直阻塞 */ serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 * * @throws IOException */ public void listen() throws IOException { System.out.println("服务器启动成功!"); // 轮询访问selector while (true) { // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞 selector.select(); // 获得selector中选中项的迭代器,选中的项为注册事件 Iterator<?> iter = this.selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = (SelectionKey) iter.next(); // 删除已选的key,以防重复处理 iter.remove(); if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); // 获得和客户端连接的通道 SocketChannel channel = serverChannel.accept(); // 设置为非阻塞 channel.configureBlocking(false); // 给客户端发送信息 channel.write(ByteBuffer.wrap(new String("我是服务器~").getBytes())); // 在和客户端连接成功之后,为了可以收到客户端的信息,需要给通道设置读的权限。 channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件 } else if (key.isReadable()) { read(key); } } } } /** * 处理读取客户端发来的信息事件 * @param key * @throws IOException */ private void read(SelectionKey key) throws IOException { // 服务器可读取消息:得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(10); channel.read(buffer); byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("服务端收到信息:" + msg); ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); channel.write(outBuffer); } /** * 启动服务端测试 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(); server.initServer(8000); server.listen(); } }
NIOClient.java
package com.lee.jdk.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO客户端 * * @author Lee * */ public class NIOClient { // 通道管理器 private Selector selector; /** * 获得一个Socket通道,并对该通道进行初始化 * * @param ip * @param port * @throws IOException */ public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道 SocketChannel channel = SocketChannel.open(); // 设置通道为非阻塞 channel.configureBlocking(false); // 获得一个通道管理器 this.selector = Selector.open(); /* * 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中 * 调用channel.finishConnect();才能完成连接 */ channel.connect(new InetSocketAddress(ip, port)); // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件 channel.register(selector, SelectionKey.OP_CONNECT); } public void listen() throws IOException { // 轮询访问selector while (true) { selector.select(); // 获得selector中选中的项的迭代器 Iterator<?> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 删除已选的key,以防重复处理 ite.remove(); // 连接事件发生 if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); // 如果正在连接,则完成连接 if (channel.isConnectionPending()) { channel.finishConnect(); } // 设置成非阻塞 channel.configureBlocking(false); // 在这里可以给服务端发送信息哦 channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes())); // 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。 channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件 } else if (key.isReadable()) { read(key); } } } } /** * 处理读取服务端发来的信息事件 * * @param key * @throws IOException */ private void read(SelectionKey key) throws IOException { // 客户端可读取消息:得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(10); channel.read(buffer); byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("客户端收到信息:" + msg); ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); channel.write(outBuffer); } /** * 启动客户端测试 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { NIOClient client = new NIOClient(); client.initClient("127.0.0.1", 8000); client.listen(); } }
相关文章推荐
- HBase学习之四: mapreduce处理数据后存储到hbase及错误java.lang.NoClassDefFoundError的解决办法
- 【项目管理和构建】十分钟教程,eclipse配置maven + 创建maven项目(二)
- Java事件总线
- Java设计模式(学习整理)---工厂模式
- java保留小数点后两位的方法
- Java 中的流
- JavaMail authenticating 535 error
- 深入理解Java虚拟机--java内存区域
- java.lang.NoClassDefFoundError: org/activiti/bpmn/model/StartEvent
- struts2.5动态方法绑定问题
- java文件的复制粘贴,Fileinput,FileOuput流
- java快捷键
- Spring整合Hibernate配置文件
- java---集合类
- Java面向对象之五指棋
- struts2--Action的四种配置方式
- java 泛型
- Exception in thread "main" java.lang.NoClassDefFoundError: scala/Predef
- java web(三)
- Ant path 匹配原则 spring