您的位置:首页 > 编程语言 > Java开发

使用Java NIO编写高性能的服务器

2013-09-11 23:47 453 查看
从JDK 1.4开始,Java的标准库中就包含了NIO,即所谓的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及 WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。

Server:

/**
*
*/
package nio.file;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;

/**
* @author ztz
*
*/
public class NIOServer {

static int BLOCK=4096;
//处理与客户端的交互
public class HandleClient{
protected FileChannel channel;
protected ByteBuffer buffer;
public HandleClient() throws IOException{
this.channel = new FileInputStream(filename).getChannel();
this.buffer = ByteBuffer.allocate(BLOCK);
}
public ByteBuffer readBlock(){
try {
buffer.clear();
int count = channel.read(buffer);
buffer.flip();
if(count<=0)
return null;
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
public void close(){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

protected Selector selector;
protected String filename = "XXX";
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
public NIOServer(int port) throws IOException{
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
//获取Selector
protected Selector getSelector(int port) throws IOException{
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
//监听端口
public void listen(){
try {
for(;;){
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
//处理事件
protected void handleKey(SelectionKey key) throws IOException {
if(key.isAcceptable())//接收请求
{
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable())//读信息
{
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(clientBuffer);
if(count>0){
clientBuffer.flip();
CharBuffer charBuffer = decoder.decode(clientBuffer);
System.out.println("Client>>"+charBuffer.toString());
SelectionKey wkey = channel.register(selector, SelectionKey.OP_WRITE);
wkey.attach(new HandleClient());
}else{
channel.close();
clientBuffer.clear();
}
}else if(key.isWritable())//写事件
{
SocketChannel channel = (SocketChannel) key.channel();
HandleClient handle = (HandleClient) key.attachment();
ByteBuffer block = handle.readBlock();
if(block!=null){
channel.write(block);
}else{
handle.close();
channel.close();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
try {
int port = 12345;
NIOServer server = new NIOServer(port);
System.out.println("Listening on "+port);
while(true){
server.listen();
}
} catch (IOException e) {
e.printStackTrace();
}

}

}

Client:

/**
*
*/
package nio.file;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 文件下载客户端
* @author ztz
*
*/
public class NIOClient {

static int SIZE = 100;
static InetSocketAddress ip = new InetSocketAddress("localhost",12345);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class DownLoad implements Runnable{

int index;
public DownLoad(int index){
this.index = index;
}
@Override
public void run() {
try {
long start = System.currentTimeMillis();
SocketChannel client = SocketChannel.open();
client.configureBlocking(false);
Selector selector = Selector.open();
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(ip);
ByteBuffer buffer = ByteBuffer.allocate(8*1024);
int total = 0;
FOR: for(;;){
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if(key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();
channel.write(encoder.encode(CharBuffer.wrap("Hello from "+index)));
channel.register(selector, SelectionKey.OP_READ);

}
}else if (key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if(count > 0){
total += count;
buffer.clear();
}else{
client.close();
break FOR;
}
}
}

double last = (System.currentTimeMillis()-start)*1.0/1000;
System.out.println("Thread"+index+" download "+total+" bytes in "+last+"s.");

}
} catch (IOException e) {
e.printStackTrace();
}
}

}

/**
* @param args
*/
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(SIZE);
for(int index = 0;index < SIZE;index++){
exec.execute(new DownLoad(index));
}
exec.shutdown();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: