您的位置:首页 > 编程语言 > Java开发

JavaSocket通信--BIO,NIO,AIO

2016-12-16 17:41 495 查看

JavaSocket通信--BIO与、NIO以及AIO

1. JavaSocket通信

对于网络通信而言NIO,AIO并没有改变网通通信的基本步骤,即Socket建立连接需要三次握手serversocket
。只是在其原来的基础上(serverscoket,socket)做了进一步封装和优化。

概括来说,一个IO操作可以分为两个部分:发出请求、结果完成。如果从发出请求到结果返回,一直Block,那就是Blocking IO(BIO);如果发出请求就返回,结果返回是Block在select,则其能称为non-blocking
IO(NIO);如果发出请求就返回,结果返回通过Call Back的方式被处理,就是AIO。

2. BIO

同步阻塞式IO,服务器端与客户端通过三次握手后建立连接,连接成功,双方通过I/O进行同步阻塞式通信。
弊端:1,读和写操作是同步阻塞的,任何一端出现网络性能问题,都会影响另一方。2,一个链路建立一个线程,无法满足高并发,高性能需求。其连接方式如下图所示:



Java实现代码:
ChatServer.java
package com.server;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* client-server 服务端是做客户端的请求响应的,分清客户端和服务端的功能和作用
*
* @author Administrator
*
*/
public class ChartServer {

private ExecutorService executorService; // 执行服务
private ServerSocket serverSocket; // 服务端socket
private int port; // 端口

private boolean quit = false; // 是否退出
private List<SocketTask> socketTasks = null; // 连接客户端socket集合
private Socket socket = null; // 临时socket
private SocketTask socketTask = null;

public ChartServer() {
}

/**
* 初始化
*
* @param port端口
*/
public ChartServer(int port) {
this.port = port;
socketTasks = new ArrayList<SocketTask>();
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 20);
}

public void start() {
try {
serverSocket = new ServerSocket(port);
System.out.println("等待客户端用户连接...");
} catch (IOException e) {
System.out.println(e.getMessage() + "5...");
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
while (!quit) {
try {
socket = serverSocket.accept();
socketTask = new SocketTask(socket);
executorService.execute(socketTask); // 放入线程池并执行
socketTasks.add(socketTask);

} catch (IOException e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}
}
}).start();

}
/**
* 关闭服务器
*/
public void stop() {
this.quit = true;
try {
if (socketTasks.size() != 0) {
for (SocketTask st : socketTasks) {

if (st.dis != null)
st.dis.close();
if (st.dos != null)
st.dos.close();
if (st.socket != null)
st.socket.close();
}
}
socketTasks.clear();
if (serverSocket != null)
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println(e.getMessage() + "+1...");
}
}

/**
* 服务端开启的临时socket,供与客户端socket连接
*
* @author Administrator
*
*/
private class SocketTask implements Runnable {

private Socket socket;
private DataInputStream dis;
private DataOutputStream dos;
private String message;
private String userId;

public SocketTask(Socket socket) {
this.socket = socket;
try {
// socket.setSoTimeout(5*1000);
socket.setKeepAlive(true);

dos = new DataOutputStream(socket.getOutputStream());
dis = new DataInputStream(socket.getInputStream());
} catch (IOException e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}

/**
* 发送消息
*
* @param message
*/
public void sendMessage(String msg) {
try {
if (msg != null && dos != null)
dos.writeUTF(msg);
} catch (IOException e) {
System.out.println(e.getMessage() + "3...");
e.printStackTrace();
}
}

/**
* 关闭socket
*/
public void stopSocket() {
try {
if (dis != null)
dis.close();
if (dos != null)
dos.close();
if (socket != null)
socket.close();

socketTasks.remove(this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
try {
while (true) {
if (dis == null)
return;
message = dis.readUTF();
System.out.println(message);
if (message.startsWith(MsgFlag.MSG_ONLINE)) { // client登录消息到server
userId = message.substring(MsgFlag.MSG_ONLINE.length());
int onlineNums = socketTasks.size() - 1;
System.out.println("当前在线人数:" + onlineNums);
if (onlineNums > 0) {
for (SocketTask st : socketTasks) {

if (st != this && st.socket.getKeepAlive())
this.sendMessage(MsgFlag.MSG_Curr_ONLINE_USER + st.userId); // 将当前用户信息发送到刚连接的client
}
} else { // 当前只有自己在线
this.sendMessage(MsgFlag.MSG_MESSAGE + "只有自己在线...");
}
} else if (message.startsWith(MsgFlag.MSG_MESSAGE)) { // client发送消息到server
int onlineNums = socketTasks.size() - 1;
System.out.println("当前在先人数:" + onlineNums);

if (onlineNums > 0) {
for (SocketTask st : socketTasks) {
if (st != this && st.socket.getKeepAlive())
st.sendMessage(message);
}
} else { // 只有自己在线
this.sendMessage(MsgFlag.MSG_MESSAGE + "只有自己在线...");
}
} else if (message.startsWith(MsgFlag.MSG_OFFLINE)) {
stopSocket();
}

}
} catch (IOException e) {
e.printStackTrace();
System.out.println(e.getMessage() + "+1。。。");

}finally {
stopSocket();
}
}
}

}
MsgFlag.java
package com.server;

public class MsgFlag {

public final static String MSG_ONLINE = "_online";
public final static String MSG_MESSAGE = "_MSG";
public final static String MSG_OFFLINE = "_offline";
public final static String MSG_Curr_ONLINE_USER = "_Current_User";
}
Client.java

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Scanner;

public class Client implements Runnable {

private Socket socket;

private String userId;

private final static String ip = "10.60.0.48";
private final static int port = 9000;

private DataInputStream dis;
private DataOutputStream dos;

private String message;

private Scanner sc;
private String choice;

private boolean running;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

public Client() {
try {
running = true;
sc = new Scanner(System.in);
SocketAddress socAddress = new InetSocketAddress(ip, port);
socket = new Socket();
socket.connect(socAddress);
socket.setKeepAlive(true);

if (socket.isConnected()) {
System.out.println("连接成功..");
userId = socket.getLocalAddress().getHostName() + sdf.format(new Date());
dis = new DataInputStream(socket.getInputStream());
dos = new DataOutputStream(socket.getOutputStream());
dos.writeUTF(MsgFlag.MSG_ONLINE + userId);
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 关闭客户端socket
*/
private void stop() {
try {
if (dis != null)
dis.close();
if (dos != null)
dos.close();
if (socket != null)
socket.close();

if(running ==true)running = false;
} catch (IOException e) {
e.printStackTrace();
}
}

private void sendMessage() {
String msg = null;
System.out.println("请输入消息内容:");
if (sc.hasNext()) {
msg = sc.nextLine();
}

try {
if (msg != null && dos != null) {
dos.writeUTF(
MsgFlag.MSG_MESSAGE + "userId:" + userId + "time:" + sdf.format(new Date()) + "Msg:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("写数据异常。。。");

}
}

@Override
public void run() {
// 轮询读缓冲区数据
try {

while (running) {
System.out.println("选择操作:1.发送消息\t 2.停止");
if (sc.hasNext()) {
choice = sc.nextLine();
}
if (choice.equals("1"))
sendMessage();
else if (choice.equals("2")) {
stop();
System.out.println("客户端userId:" + userId + "is closed");

}

// 读消息
if(running){
if (dis.available() >= 0) {
message = dis.readUTF();
System.out.println(message);
if (message.startsWith(MsgFlag.MSG_Curr_ONLINE_USER)) { // client接收server发送的当前用户信息

System.out.println(message.substring(MsgFlag.MSG_Curr_ONLINE_USER.length()));
} else if (message.startsWith(MsgFlag.MSG_MESSAGE)) {
System.out.println(message.substring(MsgFlag.MSG_MESSAGE.length()));
}
}
}

}
} catch (IOException e) {
e.printStackTrace();
// System.out.println("度数据异常....,自动关闭客户端");
}
}
}


3. NIO

nio类库是jdk1.4中引入的,它弥补了同步阻塞IO的不足,它在Java提供了高速的,面向块的I/O。同步阻塞IO是以流的方式处理数据,而NIO是以块的方式处理数据。面向流的I/O通常比较慢, 按块处理数据比按(流式的)字节处理数据要快得多。常用的基于NIO的通信框架有:Netty,Mina,Grizzly。

其通信方式是通过Selector(相当于管家),管理所有的IO事件(OP_CONNECT(连接),OP_ACCEPT(接受),OP_READ(读),OP_WRITE(写)), 当IO事件注册到选择器的时候,选择器会给他们分配一个key,当IO事件就绪的时候会通过key值来找到相应的SocketChannel进行发送和接收数据进行通信。具体如下图:



Buffer缓冲区:

ByteBuffer是NIO里用得最多的Buffer,它包含两个实现方式:
HeapByteBuffer
是基于Java堆的实现,而
DirectByteBuffer
则使用了
unsafe
的API进行了堆外的实现。

在Buffer中,有个byte[]hb, 为缓存区,用于缓存数据。
ByteBuffer最核心的方法是
put()
get()
。分别是往ByteBuffer里写和读数据。

而且有 
    private int
mark = -1;  //为某一读过的位置做标记,便于某些时候回退到该位置。

    private int position = 0; //当前读写的位置。

    private int limit; //读写的上限,limit<=capacity。

    private int capacity;//缓存区容量
四个标志量,其中mark <= position <= limit <= capacity

Put:
写模式下,往buffer里写一个字节,并把postion移动一位。一般limit与capacity相等。

public ByteBuffer put(byte x) {

hb[ix(nextPutIndex())] = x;
return this;

}
final int nextPutIndex() {                          // package-private
if (position >= limit)
throw new BufferOverflowException();
return position++;
}






Flip: 需要读取数据时,将postion复位到0,并将limit设为当前postion。
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}







Get: 从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

public byte get() {
return hb[ix(nextGetIndex())];
}

final int nextGetIndex() {                          // package-private
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}







Clear: 标志量重置。
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}




NIO实例:

NIOServer.java
package com.nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class NIOServer implements Runnable {

private final static int BLOCK_SIZE = 1024;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private ByteBuffer readBuffer;
private Charset cs = Charset.forName("utf-8");

public NIOServer(String ip , int port) {

try {
readBuffer = ByteBuffer.allocateDirect(BLOCK_SIZE);
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(ip, port));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("the server is started");

} catch (IOException e) {
e.printStackTrace();
}

}

@Override
public void run() {
int count = 0;
while(true){
try {
count = selector.select();
if(count <=0)continue;

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey sk = iterator.next();
handlerSK(sk);
iterator.remove();
}
} catch (Exception e) {
}
}

}

private void handlerSK(SelectionKey sk) throws IOException {
//		ServerSocketChannel ssChannel = null;
SocketChannel sChannel = null;
if(sk.isAcceptable()){  //if the SelectionKey.OP_ACCEPT
//			ssChannel = (ServerSocketChannel) sk.channel();
sChannel = serverSocketChannel.accept();
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);

if(sChannel.isConnected()){
System.out.println("host:"+sChannel.getLocalAddress()+"is connected");
}

}else if(sk.isReadable()){ //SelectionKey.OP_READ
sChannel = (SocketChannel) sk.channel();

int len = 0;
readBuffer.clear();
while((len = sChannel.read(readBuffer)) >0){
readBuffer.flip();
System.out.println(String.valueOf(cs.decode(readBuffer).array()));
}
sChannel.register(selector, SelectionKey.OP_WRITE);
}else if(sk.isWritable()){ //SelectionKey.OP_WRITE
sChannel = (SocketChannel) sk.channel();

readBuffer.clear();
readBuffer.put("woshiserver".getBytes(cs));
readBuffer.flip();

sChannel.write(readBuffer);

sChannel.register(selector, SelectionKey.OP_READ);
}
}

}

NioClient.java
package com.nio;

import java.awt.SecondaryLoop;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;

public class NioClient implements Runnable{

private Charset cs = Charset.forName("utf-8");
private SocketChannel socketChannel;
private Selector selector;
private String ip;
private int port;
Scanner sc = null;
public NioClient(String ip, int port) {
this.ip = ip;
this.port = port;
try {
sc = new Scanner(System.in);
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);

socketChannel.connect(new InetSocketAddress(ip, port));

} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
Iterator<SelectionKey> itor = null;
int len = 0;
try {
while(true){  //轮询事件
while((len = selector.select()) >0){  //存在要处理的事件
itor = selector.selectedKeys().iterator();
System.out.println("len:"+len);
while(itor.hasNext()){
SelectionKey sk = itor.next();
process(sk);
itor.remove();  //删除

}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void process(SelectionKey sk) {

try {

if(sk.isConnectable()){

SocketChannel channel =  (SocketChannel) sk.channel();
if(channel.isConnectionPending()){
channel.finishConnect();
}

if(channel.isConnected()){
System.out.println(channel.getRemoteAddress().toString());
}

channel.register(selector, SelectionKey.OP_WRITE);
}
if(sk.isReadable()){  //channel可读
SocketChannel channel =  (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(1024);
int len;
byte[] b = new byte[1024];
System.out.println("读取中,..");
while((len = channel.read(bb))>0){
bb.flip();
System.out.println(String.valueOf(cs.decode(bb).array()));

}
channel.register(selector, SelectionKey.OP_WRITE);
}else if(sk.isWritable()){
SocketChannel channel =  (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(1024);
bb.put("woshizhangsan".getBytes(cs));
bb.flip();
channel.write(bb);

channel.register(selector, SelectionKey.OP_READ);
}

} catch (IOException e) {
e.printStackTrace();
}
}

}


4. AIO

Java的AIO是在JDK1.7出现的,就像当年发布NIO特性支持时,基本上所有的Java服务器都重写了自己的网络框架以通过NIO来提高服务器的性能。现在很多的网络框架(如Mina),大型软件(如Oracle DB)都宣布自己已经在新版本中支持了AIO的特性以提高性能。下面就来看一下aio的基本原理,以及如何使用JDK7的AIO特性。 
    所谓AIO,异步IO,其主要是针对进程在调用IO获取外部数据时,是否阻塞调用进程而言的。一个进程的IO调用步骤大致如下: 
    1、进程向操作系统请求数据 
    2、操作系统把外部数据加载到内核的缓冲区中, 
    3、操作系统把内核的缓冲区拷贝到进程的缓冲区 
    4、进程获得数据完成自己的功能 

Java代码实现:

AIOServer.java

package com.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AIOServer {

public final static int PORT = 9001;
public final static String IP = "127.0.0.1";

private AsynchronousServerSocketChannel server = null;

public AIOServer(){
try {
//异步通道 AsynchronousServerSocketChannel
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));

System.out.println("Server is started");
} catch (IOException e) {
e.printStackTrace();
}
}

public void start(){

//注册事件和事件完成后的处理器
server.accept(null,new CompletionHandler<AsynchronousSocketChannel,Object>(){

final ByteBuffer buffer = ByteBuffer.allocate(1024);

@Override
public void completed(AsynchronousSocketChannel result,Object attachment) {

System.out.println(Thread.currentThread().getName());
Future<Integer> writeResult = null;

try{
buffer.clear();
result.read(buffer).get(1000,TimeUnit.SECONDS);

System.out.println(result.getRemoteAddress().toString()+": "+ new String(buffer.array()));

buffer.flip();
writeResult = result.write(buffer);
}catch(InterruptedException|IOException| ExecutionException | TimeoutException e){
e.printStackTrace();
} finally{
server.accept(null,this);
try {
writeResult.get();
result.close();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed:"+exc);
}

});
}

public static void main(String[] args) {
new AIOServer().start();
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}
AIOClient.java

package com.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOClient {

public static void main(String[] args) throws IOException {

final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",9001);

CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){

@Override
public void completed(Void result, Object attachment) {
client.write(ByteBuffer.wrap("Hello".getBytes()),null,
new CompletionHandler<Integer,Object>(){

@Override
public void completed(Integer result,
Object attachment) {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){

@Override
public void completed(Integer result,
ByteBuffer attachment) {
buffer.flip();
System.out.println(new String(buffer.array()));
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc,
ByteBuffer attachment) {
}

});
}

@Override
public void failed(Throwable exc, Object attachment) {
}

});
}

@Override
public void failed(Throwable exc, Object attachment) {
}

};

client.connect(serverAddress, null, handler);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  socket 通信 nio BIO AIO