【JAVA开发之架构专题】10.NIO通信架构
2017-10-10 17:00
274 查看
1. NIO概念
jdk 1.4出的非阻塞(nonblocking) IO
基于
通道和
缓冲区操作
NIO模式图
2. NIO核心组成部分
Channel:高速通道ServerSocketChannel: 静态工厂方法
open创建实例,
ServerSocketChannel封装的
ServerSocket还是
blocking IO模式。
configureBlocking(false)时,多路注册器
Selector可用。
Selector:多路注册器。自身静态工厂方法
open创建实例。通过
SelectorKey.OP_ACCEPT注册
Selector。(线程在
Selector轮询拿四大事件的
SelectorKey集合
set< SelectorKey >)。
SelectorKey里面包含哪些信息呢?有
Channel和
Selector
3. 代码体验
NIOServer.java和NIOClient.java1. NIOServer
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * NIO服务端 * @author whbing * */ public class NIOServer { //缓冲区,对应Channel的通信方式 private ServerSocketChannel server; int port = 8080;//默认值,后边的构造器可以改值 //多路注册复用器,注册SelectKey_OP各种操作事件 private Selector selector; //接收缓冲池和发送缓冲池 ByteBuffer recBuffer = ByteBuffer.allocate(1024); ByteBuffer sendBuffer = ByteBuffer.allocate(1024); //缓存机制 Map<SelectionKey, String> essiomMsg = new HashMap<SelectionKey,String>(); //对客户端编号 private static int client_no = 19056; //构造器 public NIOServer(int port) throws IOException{ this.port = port;//如果没有传port,就用上边默认的port server= ServerSocketChannel.open(); //底层就是一个ServerSocket server.socket().bind(new InetSocketAddress(this.port)); server.configureBlocking(false); //再将blocking设为false后,开启selector selector = Selector.open(); server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO消息服务器初始化完成,可以随时接收客户端链接,监听端口为"+this.port); } //我们需要用一个线程去监听selector,看上边是否有满足我们需要的事件类型SelectionKey private void listener() throws IOException{ while(true){ int evenCount = selector.select();//没有找到selector则为0 if(evenCount == 0){ continue; } Set<SelectionKey> keys = selector.selectedKeys(); //遍历并处理监听到selector中的事件 final Iterator<SelectionKey> iteratorKeys = keys.iterator(); while (iteratorKeys.hasNext()) { process(iteratorKeys.next()); iteratorKeys.remove(); } } } //这里就是用来处理每一个SelectionKey:包含通道Channel信息 和 selector信息 private void process(SelectionKey key) { SocketChannel client = null; try { if(key.isValid() && key.isAcceptable()){ client=server.accept(); ++client_no; client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); }else if(key.isValid() && key.isReadable()){ //服务器从SocketChannel读取客户端发送过来的信息 recBuffer.clear(); client= (SocketChannel)key.channel(); int len = client.read(recBuffer); if(len>0){ String msg=new String(recBuffer.array(), 0, len); essiomMsg.put(key, msg); System.out.println("当前维护的线程ID:"+ Thread.currentThread().getId() +"对客户端写信息,客户端编号为:"+client_no+"信息为:"+msg); //改变状态,又会被监听器监听到 client.register(selector, SelectionKey.OP_WRITE); } }else if(key.isValid() && key.isWritable()){ if(!essiomMsg.containsKey(key)){ return; } client = (SocketChannel)key.channel(); //position=0 sendBuffer.clear(); //如position=500 sendBuffer.put((essiomMsg.get(key)+"你好,已经处理完请求!").getBytes()); //limit=500 position=0 0-->limt sendBuffer.flip(); client.write(sendBuffer); System.out.println("当前维护的线程ID:"+ Thread.currentThread().getId() +"对客户端写信息,客户端编号为:"+client_no); //改变状态,又会被监听器监听到 client.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { //防止客户端非法下线 key.cancel(); try { client.socket().close(); client.close(); System.out.println("【系统提示】"+new SimpleDateFormat().format(new Date())+ essiomMsg.get(key)+"已下线"); } catch (IOException e1) { e1.printStackTrace(); } } } public static void main(String[] args) { try { new NIOServer(8080).listener(); } catch (IOException e) { e.printStackTrace(); } } }
NIOclient
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * NIO客户端 * @author whbing * */ public class NIOClient { private SocketChannel client; InetSocketAddress serverAddress = new InetSocketAddress("localhost", 8080); private Selector selector; //接收缓冲池和发送缓冲池 ByteBuffer recBuffer = ByteBuffer.allocate(1024); ByteBuffer sendBuffer = ByteBuffer.allocate(1024); //构造器 public NIOClient() throws IOException{ //构造client实例 client= SocketChannel.open(); client.configureBlocking(false); client.connect(serverAddress); //构造selector实例 selector = Selector.open(); //注册连接事件 client.register(selector, SelectionKey.OP_CONNECT); //Netty Reactor线程池组 Tomcat bootstrap } private void session() throws IOException{ if(client.isConnectionPending()){ client.finishConnect(); client.register(selector, SelectionKey.OP_WRITE); System.out.println("已经连接到服务器,可以在控制台登记了"); } Scanner sc = new Scanner(System.in); while(sc.hasNextLine()){ String msg = sc.nextLine(); if("".equals(msg)){ continue; } if("exit".equals(msg)){ System.exit(0); } process(msg); } } private void process(String name) { boolean waitHelp = true; Iterator<SelectionKey> iteratorKeys = null; Set<SelectionKey> keys =null; while (waitHelp) { try { int readys = selector.select(); //如果没有客人,继续轮询 if(readys == 0){ continue; } keys = selector.selectedKeys(); iteratorKeys = keys.iterator(); //一个个迭代keys while(iteratorKeys.hasNext()){ SelectionKey key = iteratorKeys.next(); if(key.isValid() && key.isWritable()){ //可写就是客户端要对服务器发送信息 sendBuffer.clear(); sendBuffer.put(name.getBytes()); sendBuffer.flip(); client.write(sendBuffer); client.register(selector, SelectionKey.OP_READ); }else if(key.isValid() && key.isReadable()){ //服务器发送信息回来,给客户端读 recBuffer.clear(); int len = client.read(recBuffer); if(len>0){ recBuffer.flip(); System.out.println("服务器返回的消息是: 当前维护的线程ID:"+ Thread.currentThread().getId() +"对客户端写信息"+ new String(recBuffer.array(), 0, len) ); //改变状态,又会被监听器监听到 client.register(selector, SelectionKey.OP_WRITE); waitHelp = false; } } //检查完之后,打发客户走 iteratorKeys.remove(); } } catch (IOException e) { //防止客户端非法下线 ((SelectionKey)keys).cancel(); try { client.socket().close(); client.close(); return; } catch (IOException e1) { e1.printStackTrace(); } } } } public static void main(String[] args) { try { new NIOClient().session(); } catch (IOException e) { e.printStackTrace(); } } }
结果:
//client 已经连接到服务器,可以在控制台登记了 whbing上线了! 服务器返回的消息是: 当前维护的线程ID:1对客户端写信息whbing上线了!你好,已经处理完请求! second! 服务器返回的消息是: 当前维护的线程ID:1对客户端写信息second!你好,已经处理完请求! //server NIO消息服务器初始化完成,可以随时接收客户端链接,监听端口为8080 当前维护的线程ID:1对客户端写信息,客户端编号为:19057信息为:whbing上线了! 当前维护的线程ID:1对客户端写信息,客户端编号为:19057 当前维护的线程ID:1对客户端写信息,客户端编号为:19057信息为:second! 当前维护的线程ID:1对客户端写信息,客户端编号为:19057
相关文章推荐
- 【JAVA开发之架构专题】11. 线程池原理
- 现代Java Web开发架构分析
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- [书目20080215]企业级Java开发与架构(专业程序员在实战中的蜕变)
- 基于java技术的软件开发架构总结
- 企业级应用中的Applet和Servlet的通信-Java基础-Java-编程开发
- 基于java技术的软件开发架构总结
- JAVA专题技术综述之线程篇-Java基础-Java-编程开发
- Java RPC通信机制之XML-RPC:Apache XML-RPC 3.0开发简介
- Java RPC通信机制之SOAP:应用Apache Axis进行Web Service开发
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- 表现层框架Struts/Tapestry/JSF架构比较-Java基础-Java-编程开发
- 基于java技术的软件开发架构总结
- 基于java技术的软件开发架构总结
- 职位:JAVA高级开发,系统架构工程师