您的位置:首页 > 编程语言 > Java开发

使用Java NIO编写高性能的服务器

2011-03-07 15:25 405 查看

使用Java NIO编写高性能的服务器


JDK 1.4
开始,Java
的标准库中就包含了NIO


即所谓的“New
IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及
WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。

话说回来,传统的Server/Client实现是基于Thread per request,即服务器为每个客户端请求建立一个线程处理,单独负责处理一个客户的请求。比如像Tomcat
(新版本也会提供NIO
方案)、Resin等Web服务器就是这样实现的。当然为了减少瞬间峰值问题,服务器一般都使用线程池,规定了同时并发的最大数量,避免了线程的无限增长。

但这样有一个问题:如果线程池的大小为100,当有100个用户同时通过HTTP现在一个大文件时,服务器的线程池会用完,因为所有的线程都在传输大文件了,即使第101个请求者仅仅请求一个只有10字节的页面,服务器也无法响应了,只有等到线程池中有空闲的线程出现。

另外,线程的开销也是很大的,特别是达到了一个临界值后,性能会显著下降,这也限制了传统的Socket方案无法应对并发量大的场合,而“非阻塞”的IO就能轻松解决这个问题。

下面只是一个简单的例子:服务器提供了下载大型文件的功能,客户端连接上服务器的12345端口后,就可以读取服务器发送的文件内容信息了。注意这里的服务器只有一个主线程,没有其他任何派生线程,让我们看看NIO
是如何用一个线程处理N个请求的。

NIO
服务器最核心的一点就是
反应器模式:当有感兴趣的事件发生的,就通知对应的事件处理器去处理这个事件,如果没有,则不处理。所以使用一个线程做轮询就可以了。当然这里这是个例
子,如果要获得更高性能,可以使用少量的线程,一个负责接收请求,其他的负责处理请求,特别是对于多CPU时效率会更高。

关于使用NIO
过程中出现的问
题,最为普遍的就是为什么没有请求时CPU的占用率为100%?出现这种问题的主要原因是注册了不感兴趣的事件,比如如果没有数据要发到客户端,而又注册
了写事件(OP_WRITE),则在
Selector.select()上就会始终有事件出现,CPU就一直处理了,而此时select()应该是阻塞的。

另外一个值得注意的问题是:由于只使用了一个线程(多个线程也如此)处理用户请求,所以要避免线程被阻塞,解决方法是事件的处理者必须要即刻返回,不能陷入循环中,否则会影响其他用户的请求速度。

具体到本例子中,由于文件比较大,如果一次性发送整个文件(这里的一次性不是指send整个
文件内容,而是通过while循环不间断的发送分组包),则主线程就会阻塞,其他用户就不能响应了。这里的解决方法是当有WRITE事件时,仅仅是发送一
个块(比如4K字节)。发完后,继续等待WRITE事件出现,依次处理,直到整个文件发送完毕,这样就不会阻塞其他用户了。

服务器的例子:

package 

nio
.file;

import 

java
.io.FileInputStream;

import 

java
.io.IOException;

import 

java
.net.InetSocketAddress;

import 

java
.nio
.ByteBuffer;

import 

java
.nio
.CharBuffer;

import 

java
.nio
.channels.FileChannel;

import 

java
.nio
.channels.SelectionKey;

import 

java
.nio
.channels.Selector;

import 

java
.nio
.channels.ServerSocketChannel;

import 

java
.nio
.channels.SocketChannel;

import 

java
.nio
.charset.Charset;

import 

java
.nio
.charset.CharsetDecoder;

import 

java
.util.Iterator;

/**

* 测试文件下载的NIOServer

*

*
@author
tenyears.cn

*/

public class 

NIOServer
{

static 

int 

BLOCK =
4096
;

// 处理与客户端的交互

public class 

HandleClient
{

protected 

FileChannel channel;

protected 

ByteBuffer buffer;

public 

HandleClient
()
throws 

IOException
{

this

.channel =
new 

FileInputStream
(
filename
)
.getChannel
()
;

this

.buffer = ByteBuffer.allocate
(
BLOCK
)
;

}

public 

ByteBuffer readBlock
() {

try 

{

buffer.clear
()
;

int 

count = channel.read
(
buffer
)
;

buffer.flip
()
;

if 

(
count <=
0
)

return null

;

}
catch 

(
IOException e
) {

e.printStackTrace
()
;

}

return 

buffer;

}

public 

void 

close
() {

try 

{

channel.close
()
;

}
catch 

(
IOException e
) {

e.printStackTrace
()
;

}

}

}

protected 

Selector selector;

protected 

String filename =
"d://bigfile.dat"
;
// a big file

protected 

ByteBuffer clientBuffer = ByteBuffer.allocate
(
BLOCK
)
;

protected 

CharsetDecoder decoder;

public 

NIOServer
(
int 

port
)
throws 

IOException
{

selector =
this

.getSelector
(
port
)
;

Charset charset = Charset.forName
(
"GB2312"
)
;

decoder = charset.newDecoder
()
;

}

// 获取Selector

protected 

Selector getSelector
(
int 

port
)
throws 

IOException
{

ServerSocketChannel server = ServerSocketChannel.open
()
;

Selector sel = Selector.open
()
;

server.socket
()
.bind
(
new 

InetSocketAddress
(
port
))
;

server.configureBlocking
(
false

)
;

server.register
(
sel, SelectionKey.OP_ACCEPT
)
;

return 

sel;

}

// 监听端口

public 

void 

listen
() {

try 

{

for 

(
;;
) {

selector.select
()
;

Iterator<selectionkey></selectionkey> iter = selector.selectedKeys
()

.iterator
()
;

while 

(
iter.hasNext
()) {

SelectionKey key = iter.next
()
;

iter.remove
()
;

handleKey
(
key
)
;

}

}

}
catch 

(
IOException e
) {

e.printStackTrace
()
;

}

}

// 处理事件

protected 

void 

handleKey
(
SelectionKey key
)
throws 

IOException
{

if 

(
key.isAcceptable
()) {
// 接收请求

ServerSocketChannel server =
(
ServerSocketChannel
)
key.channel
()
;

SocketChannel channel = server.accept
()
;

channel.configureBlocking
(
false

)
;

channel.register
(
selector, SelectionKey.OP_READ
)
;

}
else if 

(
key.isReadable
()) {
// 读信息

SocketChannel channel =
(
SocketChannel
)
key.channel
()
;

int 

count = channel.read
(
clientBuffer
)
;

if 

(
count >
0
) {

clientBuffer.flip
()
;

CharBuffer charBuffer = decoder.decode
(
clientBuffer
)
;

System.out.println
(
"Client >>"
+ charBuffer.toString
())
;

SelectionKey wKey = channel.register
(
selector,

SelectionKey.OP_WRITE
)
;

wKey.attach
(
new 

HandleClient
())
;

}
else

channel.close
()
;

clientBuffer.clear
()
;

}
else if 

(
key.isWritable
()) {
// 写事件

SocketChannel channel =
(
SocketChannel
)
key.channel
()
;

HandleClient handle =
(
HandleClient
)
key.attachment
()
;

ByteBuffer block = handle.readBlock
()
;

if 

(
block !=
null

)

channel.write
(
block
)
;

else 

{

handle.close
()
;

channel.close
()
;

}

}

}

public static 

void 

main
(
String
[]
args
) {

int 

port =
12345
;

try 

{

NIOServer server =
new 

NIOServer
(
port
)
;

System.out.println
(
"Listernint on "
+ port
)
;

while 

(
true

) {

server.listen
()
;

}

}
catch 

(
IOException e
) {

e.printStackTrace
()
;

}

}

}

该代码中,通过一个HandleClient来获取文件的一块数据,每一个客户都会分配一个HandleClient的实例。

下面是客户端请求的代码,也比较简单,模拟100个用户同时下载文件。

package 

nio
.file;

import 

java
.io.IOException;

import 

java
.net.InetSocketAddress;

import 

java
.nio
.ByteBuffer;

import 

java
.nio
.CharBuffer;

import 

java
.nio
.channels.SelectionKey;

import 

java
.nio
.channels.Selector;

import 

java
.nio
.channels.SocketChannel;

import 

java
.nio
.charset.Charset;

import 

java
.nio
.charset.CharsetEncoder;

import 

java
.util.Iterator;

import 

java
.util.concurrent.ExecutorService;

import 

java
.util.concurrent.Executors;

/**

* 文件下载客户端

*
@author
tenyears.cn

*/

public class 

NIOClient
{

static 

int 

SIZE =
100
;

static 

InetSocketAddress ip =
new 

InetSocketAddress
(
"localhost"
,
12345
)
;

static 

CharsetEncoder encoder = Charset.forName
(
"GB2312"
)
.newEncoder
()
;

static class 

Download
implements 

Runnable
{

protected 

int 

index;

public 

Download
(
int 

index
) {

this

.index = index;

}

public 

void 

run
() {

try 

{

long 

start = System.currentTimeMillis
()
;

SocketChannel client = SocketChannel.open
()
;

client.configureBlocking
(
false

)
;

Selector selector = Selector.open
()
;

client.register
(
selector, SelectionKey.OP_CONNECT
)
;

client.connect
(
ip
)
;

ByteBuffer buffer = ByteBuffer.allocate
(
8
*
1024
)
;

int 

total =
0
;

FOR:
for 

(
;;
) {

selector.select
()
;

Iterator<selectionkey></selectionkey> iter = selector.selectedKeys
()

.iterator
()
;

while 

(
iter.hasNext
()) {

SelectionKey key = iter.next
()
;

iter.remove
()
;

if 

(
key.isConnectable
()) {

SocketChannel channel =
(
SocketChannel
)
key

.channel
()
;

if 

(
channel.isConnectionPending
())

channel.finishConnect
()
;

channel.write
(
encoder.encode
(
CharBuffer

.wrap
(
"Hello from "
+ index
)))
;

channel.register
(
selector, SelectionKey.OP_READ
)
;

}
else if 

(
key.isReadable
()) {

SocketChannel channel =
(
SocketChannel
)
key

.channel
()
;

int 

count = channel.read
(
buffer
)
;

if 

(
count >
0
) {

total += count;

buffer.clear
()
;

}
else 

{

client.close
()
;

break 

FOR;

}

}

}

}

double 

last =
(
System.currentTimeMillis
()
- start
)
*
1.0
/
1000
;

System.out.println
(
"Thread "
+ index +
" downloaded "
+ total

+
"bytes in "
+ last +
"s."
)
;

}
catch 

(
IOException e
) {

e.printStackTrace
()
;

}

}

}

public static 

void 

main
(
String
[]
args
)
throws 

IOException
{

ExecutorService exec = Executors.newFixedThreadPool
(
SIZE
)
;

for 

(
int 

index =
0
; index < SIZE; index++
) {

exec.execute
(
new 

Download
(
index
))
;

}

exec.shutdown
()
;

}

}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: