JavaSocket通信--BIO,NIO,AIO
2016-12-16 17:41
495 查看
JavaSocket通信--BIO与、NIO以及AIO
1. JavaSocket通信
对于网络通信而言NIO,AIO并没有改变网通通信的基本步骤,即Socket建立连接需要三次握手serversocket。只是在其原来的基础上(serverscoket,socket)做了进一步封装和优化。
概括来说,一个IO操作可以分为两个部分:发出请求、结果完成。如果从发出请求到结果返回,一直Block,那就是Blocking IO(BIO);如果发出请求就返回,结果返回是Block在select,则其能称为non-blocking
IO(NIO);如果发出请求就返回,结果返回通过Call Back的方式被处理,就是AIO。
2. BIO
同步阻塞式IO,服务器端与客户端通过三次握手后建立连接,连接成功,双方通过I/O进行同步阻塞式通信。弊端:1,读和写操作是同步阻塞的,任何一端出现网络性能问题,都会影响另一方。2,一个链路建立一个线程,无法满足高并发,高性能需求。其连接方式如下图所示:
Java实现代码:
ChatServer.java
package com.server; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * client-server 服务端是做客户端的请求响应的,分清客户端和服务端的功能和作用 * * @author Administrator * */ public class ChartServer { private ExecutorService executorService; // 执行服务 private ServerSocket serverSocket; // 服务端socket private int port; // 端口 private boolean quit = false; // 是否退出 private List<SocketTask> socketTasks = null; // 连接客户端socket集合 private Socket socket = null; // 临时socket private SocketTask socketTask = null; public ChartServer() { } /** * 初始化 * * @param port端口 */ public ChartServer(int port) { this.port = port; socketTasks = new ArrayList<SocketTask>(); executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 20); } public void start() { try { serverSocket = new ServerSocket(port); System.out.println("等待客户端用户连接..."); } catch (IOException e) { System.out.println(e.getMessage() + "5..."); e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { while (!quit) { try { socket = serverSocket.accept(); socketTask = new SocketTask(socket); executorService.execute(socketTask); // 放入线程池并执行 socketTasks.add(socketTask); } catch (IOException e) { e.printStackTrace(); System.out.println(e.getMessage()); } } } }).start(); } /** * 关闭服务器 */ public void stop() { this.quit = true; try { if (socketTasks.size() != 0) { for (SocketTask st : socketTasks) { if (st.dis != null) st.dis.close(); if (st.dos != null) st.dos.close(); if (st.socket != null) st.socket.close(); } } socketTasks.clear(); if (serverSocket != null) serverSocket.close(); } catch (IOException e) { e.printStackTrace(); System.out.println(e.getMessage() + "+1..."); } } /** * 服务端开启的临时socket,供与客户端socket连接 * * @author Administrator * */ private class SocketTask implements Runnable { private Socket socket; private DataInputStream dis; private DataOutputStream dos; private String message; private String userId; public SocketTask(Socket socket) { this.socket = socket; try { // socket.setSoTimeout(5*1000); socket.setKeepAlive(true); dos = new DataOutputStream(socket.getOutputStream()); dis = new DataInputStream(socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); System.out.println(e.getMessage()); } } /** * 发送消息 * * @param message */ public void sendMessage(String msg) { try { if (msg != null && dos != null) dos.writeUTF(msg); } catch (IOException e) { System.out.println(e.getMessage() + "3..."); e.printStackTrace(); } } /** * 关闭socket */ public void stopSocket() { try { if (dis != null) dis.close(); if (dos != null) dos.close(); if (socket != null) socket.close(); socketTasks.remove(this); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { try { while (true) { if (dis == null) return; message = dis.readUTF(); System.out.println(message); if (message.startsWith(MsgFlag.MSG_ONLINE)) { // client登录消息到server userId = message.substring(MsgFlag.MSG_ONLINE.length()); int onlineNums = socketTasks.size() - 1; System.out.println("当前在线人数:" + onlineNums); if (onlineNums > 0) { for (SocketTask st : socketTasks) { if (st != this && st.socket.getKeepAlive()) this.sendMessage(MsgFlag.MSG_Curr_ONLINE_USER + st.userId); // 将当前用户信息发送到刚连接的client } } else { // 当前只有自己在线 this.sendMessage(MsgFlag.MSG_MESSAGE + "只有自己在线..."); } } else if (message.startsWith(MsgFlag.MSG_MESSAGE)) { // client发送消息到server int onlineNums = socketTasks.size() - 1; System.out.println("当前在先人数:" + onlineNums); if (onlineNums > 0) { for (SocketTask st : socketTasks) { if (st != this && st.socket.getKeepAlive()) st.sendMessage(message); } } else { // 只有自己在线 this.sendMessage(MsgFlag.MSG_MESSAGE + "只有自己在线..."); } } else if (message.startsWith(MsgFlag.MSG_OFFLINE)) { stopSocket(); } } } catch (IOException e) { e.printStackTrace(); System.out.println(e.getMessage() + "+1。。。"); }finally { stopSocket(); } } } }MsgFlag.java
package com.server; public class MsgFlag { public final static String MSG_ONLINE = "_online"; public final static String MSG_MESSAGE = "_MSG"; public final static String MSG_OFFLINE = "_offline"; public final static String MSG_Curr_ONLINE_USER = "_Current_User"; }Client.java
import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Scanner; public class Client implements Runnable { private Socket socket; private String userId; private final static String ip = "10.60.0.48"; private final static int port = 9000; private DataInputStream dis; private DataOutputStream dos; private String message; private Scanner sc; private String choice; private boolean running; private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); public Client() { try { running = true; sc = new Scanner(System.in); SocketAddress socAddress = new InetSocketAddress(ip, port); socket = new Socket(); socket.connect(socAddress); socket.setKeepAlive(true); if (socket.isConnected()) { System.out.println("连接成功.."); userId = socket.getLocalAddress().getHostName() + sdf.format(new Date()); dis = new DataInputStream(socket.getInputStream()); dos = new DataOutputStream(socket.getOutputStream()); dos.writeUTF(MsgFlag.MSG_ONLINE + userId); } } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 关闭客户端socket */ private void stop() { try { if (dis != null) dis.close(); if (dos != null) dos.close(); if (socket != null) socket.close(); if(running ==true)running = false; } catch (IOException e) { e.printStackTrace(); } } private void sendMessage() { String msg = null; System.out.println("请输入消息内容:"); if (sc.hasNext()) { msg = sc.nextLine(); } try { if (msg != null && dos != null) { dos.writeUTF( MsgFlag.MSG_MESSAGE + "userId:" + userId + "time:" + sdf.format(new Date()) + "Msg:" + msg); } } catch (IOException e) { e.printStackTrace(); System.out.println("写数据异常。。。"); } } @Override public void run() { // 轮询读缓冲区数据 try { while (running) { System.out.println("选择操作:1.发送消息\t 2.停止"); if (sc.hasNext()) { choice = sc.nextLine(); } if (choice.equals("1")) sendMessage(); else if (choice.equals("2")) { stop(); System.out.println("客户端userId:" + userId + "is closed"); } // 读消息 if(running){ if (dis.available() >= 0) { message = dis.readUTF(); System.out.println(message); if (message.startsWith(MsgFlag.MSG_Curr_ONLINE_USER)) { // client接收server发送的当前用户信息 System.out.println(message.substring(MsgFlag.MSG_Curr_ONLINE_USER.length())); } else if (message.startsWith(MsgFlag.MSG_MESSAGE)) { System.out.println(message.substring(MsgFlag.MSG_MESSAGE.length())); } } } } } catch (IOException e) { e.printStackTrace(); // System.out.println("度数据异常....,自动关闭客户端"); } } }
3. NIO
nio类库是jdk1.4中引入的,它弥补了同步阻塞IO的不足,它在Java提供了高速的,面向块的I/O。同步阻塞IO是以流的方式处理数据,而NIO是以块的方式处理数据。面向流的I/O通常比较慢, 按块处理数据比按(流式的)字节处理数据要快得多。常用的基于NIO的通信框架有:Netty,Mina,Grizzly。其通信方式是通过Selector(相当于管家),管理所有的IO事件(OP_CONNECT(连接),OP_ACCEPT(接受),OP_READ(读),OP_WRITE(写)), 当IO事件注册到选择器的时候,选择器会给他们分配一个key,当IO事件就绪的时候会通过key值来找到相应的SocketChannel进行发送和接收数据进行通信。具体如下图:
Buffer缓冲区:
ByteBuffer是NIO里用得最多的Buffer,它包含两个实现方式:
HeapByteBuffer是基于Java堆的实现,而
DirectByteBuffer则使用了
unsafe的API进行了堆外的实现。
在Buffer中,有个byte[]hb, 为缓存区,用于缓存数据。
ByteBuffer最核心的方法是
put()和
get()。分别是往ByteBuffer里写和读数据。
而且有
private int
mark = -1; //为某一读过的位置做标记,便于某些时候回退到该位置。
private int position = 0; //当前读写的位置。
private int limit; //读写的上限,limit<=capacity。
private int capacity;//缓存区容量
四个标志量,其中mark <= position <= limit <= capacity。
Put:
写模式下,往buffer里写一个字节,并把postion移动一位。一般limit与capacity相等。
public ByteBuffer put(byte x) { hb[ix(nextPutIndex())] = x; return this; }
final int nextPutIndex() { // package-private if (position >= limit) throw new BufferOverflowException(); return position++; }
Flip: 需要读取数据时,将postion复位到0,并将limit设为当前postion。
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
Get: 从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。
public byte get() { return hb[ix(nextGetIndex())]; }
final int nextGetIndex() { // package-private if (position >= limit) throw new BufferUnderflowException(); return position++; }
Clear: 标志量重置。
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
NIO实例:
NIOServer.java
package com.nio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; public class NIOServer implements Runnable { private final static int BLOCK_SIZE = 1024; private ServerSocketChannel serverSocketChannel; private Selector selector; private ByteBuffer readBuffer; private Charset cs = Charset.forName("utf-8"); public NIOServer(String ip , int port) { try { readBuffer = ByteBuffer.allocateDirect(BLOCK_SIZE); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(ip, port)); serverSocketChannel.configureBlocking(false); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("the server is started"); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { int count = 0; while(true){ try { count = selector.select(); if(count <=0)continue; Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey sk = iterator.next(); handlerSK(sk); iterator.remove(); } } catch (Exception e) { } } } private void handlerSK(SelectionKey sk) throws IOException { // ServerSocketChannel ssChannel = null; SocketChannel sChannel = null; if(sk.isAcceptable()){ //if the SelectionKey.OP_ACCEPT // ssChannel = (ServerSocketChannel) sk.channel(); sChannel = serverSocketChannel.accept(); sChannel.configureBlocking(false); sChannel.register(selector, SelectionKey.OP_READ); if(sChannel.isConnected()){ System.out.println("host:"+sChannel.getLocalAddress()+"is connected"); } }else if(sk.isReadable()){ //SelectionKey.OP_READ sChannel = (SocketChannel) sk.channel(); int len = 0; readBuffer.clear(); while((len = sChannel.read(readBuffer)) >0){ readBuffer.flip(); System.out.println(String.valueOf(cs.decode(readBuffer).array())); } sChannel.register(selector, SelectionKey.OP_WRITE); }else if(sk.isWritable()){ //SelectionKey.OP_WRITE sChannel = (SocketChannel) sk.channel(); readBuffer.clear(); readBuffer.put("woshiserver".getBytes(cs)); readBuffer.flip(); sChannel.write(readBuffer); sChannel.register(selector, SelectionKey.OP_READ); } } }
NioClient.java
package com.nio; import java.awt.SecondaryLoop; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Scanner; public class NioClient implements Runnable{ private Charset cs = Charset.forName("utf-8"); private SocketChannel socketChannel; private Selector selector; private String ip; private int port; Scanner sc = null; public NioClient(String ip, int port) { this.ip = ip; this.port = port; try { sc = new Scanner(System.in); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.connect(new InetSocketAddress(ip, port)); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { Iterator<SelectionKey> itor = null; int len = 0; try { while(true){ //轮询事件 while((len = selector.select()) >0){ //存在要处理的事件 itor = selector.selectedKeys().iterator(); System.out.println("len:"+len); while(itor.hasNext()){ SelectionKey sk = itor.next(); process(sk); itor.remove(); //删除 } } } } catch (IOException e) { e.printStackTrace(); } } private void process(SelectionKey sk) { try { if(sk.isConnectable()){ SocketChannel channel = (SocketChannel) sk.channel(); if(channel.isConnectionPending()){ channel.finishConnect(); } if(channel.isConnected()){ System.out.println(channel.getRemoteAddress().toString()); } channel.register(selector, SelectionKey.OP_WRITE); } if(sk.isReadable()){ //channel可读 SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(1024); int len; byte[] b = new byte[1024]; System.out.println("读取中,.."); while((len = channel.read(bb))>0){ bb.flip(); System.out.println(String.valueOf(cs.decode(bb).array())); } channel.register(selector, SelectionKey.OP_WRITE); }else if(sk.isWritable()){ SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(1024); bb.put("woshizhangsan".getBytes(cs)); bb.flip(); channel.write(bb); channel.register(selector, SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); } } }
4. AIO
Java的AIO是在JDK1.7出现的,就像当年发布NIO特性支持时,基本上所有的Java服务器都重写了自己的网络框架以通过NIO来提高服务器的性能。现在很多的网络框架(如Mina),大型软件(如Oracle DB)都宣布自己已经在新版本中支持了AIO的特性以提高性能。下面就来看一下aio的基本原理,以及如何使用JDK7的AIO特性。所谓AIO,异步IO,其主要是针对进程在调用IO获取外部数据时,是否阻塞调用进程而言的。一个进程的IO调用步骤大致如下:
1、进程向操作系统请求数据
2、操作系统把外部数据加载到内核的缓冲区中,
3、操作系统把内核的缓冲区拷贝到进程的缓冲区
4、进程获得数据完成自己的功能
Java代码实现:
AIOServer.java
package com.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class AIOServer { public final static int PORT = 9001; public final static String IP = "127.0.0.1"; private AsynchronousServerSocketChannel server = null; public AIOServer(){ try { //异步通道 AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT)); System.out.println("Server is started"); } catch (IOException e) { e.printStackTrace(); } } public void start(){ //注册事件和事件完成后的处理器 server.accept(null,new CompletionHandler<AsynchronousSocketChannel,Object>(){ final ByteBuffer buffer = ByteBuffer.allocate(1024); @Override public void completed(AsynchronousSocketChannel result,Object attachment) { System.out.println(Thread.currentThread().getName()); Future<Integer> writeResult = null; try{ buffer.clear(); result.read(buffer).get(1000,TimeUnit.SECONDS); System.out.println(result.getRemoteAddress().toString()+": "+ new String(buffer.array())); buffer.flip(); writeResult = result.write(buffer); }catch(InterruptedException|IOException| ExecutionException | TimeoutException e){ e.printStackTrace(); } finally{ server.accept(null,this); try { writeResult.get(); result.close(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("failed:"+exc); } }); } public static void main(String[] args) { new AIOServer().start(); while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }AIOClient.java
package com.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AIOClient { public static void main(String[] args) throws IOException { final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",9001); CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){ @Override public void completed(Void result, Object attachment) { client.write(ByteBuffer.wrap("Hello".getBytes()),null, new CompletionHandler<Integer,Object>(){ @Override public void completed(Integer result, Object attachment) { final ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer attachment) { buffer.flip(); System.out.println(new String(buffer.array())); try { client.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { } }); } @Override public void failed(Throwable exc, Object attachment) { } }); } @Override public void failed(Throwable exc, Object attachment) { } }; client.connect(serverAddress, null, handler); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关文章推荐
- 基于消息实现系统间的通信(BIO,NIO,AIO)
- 互联网架构(7):Socket网络通信编程--BIO/NIO/AIO
- 网络通信简单实例BIO,NIO,AIO
- 系统间的通信(BIO,NIO,AIO)
- 基于消息实现系统间的通信(BIO,NIO,AIO)学习。
- 基于消息实现系统间的通信(BIO,NIO,AIO)
- 一、BIO、NIO、AIO通信机制理解
- Socket通信之BIO(同步阻塞IO)、PAIO(伪异步阻塞IO)、NIO(异步非阻塞IO)、AIO(异步非阻塞IO)、netty5之IO
- (5)Socket的三种通信模型--BIO,NIO和AIO
- BIO、NIO、AIO系列一:NIO
- 也谈BIO | NIO | AIO (Java版--转)
- Java面试基础篇——第九篇:BIO,NIO,AIO的区别
- BIO,NIO,AIO
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- BIO与NIO、AIO
- BIO NIO AIO
- NIO,AIO,BIO
- dubbo相关知识(四)-- Java中的BIO、NIO、AIO
- JAVA中IO技术:BIO、NIO、AIO
- 【转】Java BIO、NIO、AIO 认知