您的位置:首页 > 编程语言 > Java开发

Java NIO 实践经验总结

2007-10-08 18:28 501 查看
总的来讲,java NIO 就是一个调用select的无限循环。如果,有消息接受或者发送或者连接关闭或者打开,select都会返回这个事件。但是,在写selector循环,要注意以下几个事情

1.发送事件是因为一个packet过大,一次没有发送出去。需要分两次发时,会激活这个事件
2.如果所有的channel都从select注销了,那么select函数会阻塞。需要重新注册一个channel并且调用select.awake()重新激活。
3.注册channel的线成必须是,select循环所在的线程。
4.一个select支持1000多个channel,如果多个连接。最好用round rubin的方式,采用多个selector
5.select.attachment 可以存放自定义的Object
6.如果发现了一个读时间,最好把channel从selector里注销,然后再读取。读取完毕,再注册回selector
7.当从channel中读取字节后,可以把不完整的消息放回buffer中或者attachment中
8.Windows和Linux在server socket注册不太一样,一个要先注册后启动循环,一个是先启动循环后注册。

例子程序,基本了涵盖了文章所有的关键点

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SelectableChannel;

import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketChannelDemo

{
public static int PORT_NUMBER = 8888;

public static int MAX_SOCKETCOUNT = 1000;

public static int MAX_SELECTOR = 200;

public static int i = 0;

private ServerSocketChannel serverChannel;

private ServerSocket serverSocket;

private int dispatcherPointer = 0;

private ExecutorService exec =Executors.newFixedThreadPool(20);

/** container of dispatcher */
private Vector dispatcherList = new Vector(SocketChannelDemo.MAX_SELECTOR);

public static Long sessionId = new Long(0);

public static void main(String[] args) throws Exception {
SocketChannelDemo server = new SocketChannelDemo();
server.init(args);
}

public void init(String[] argv) throws Exception {
int port = PORT_NUMBER;

if (argv.length > 0) {
port = Integer.parseInt(argv[0]);
}

System.out.println("Listening on port " + port);

serverChannel = ServerSocketChannel.open();

serverSocket = serverChannel.socket();

serverSocket.bind(new InetSocketAddress(port));

serverChannel.configureBlocking(false);

DispatchThread dt = new DispatchThread();
serverChannel.register(dt.selector,SelectionKey.OP_ACCEPT);
dt.start();
dispatcherList.add(dt);

//register((SelectableChannel)serverChannel, SelectionKey.OP_ACCEPT);

dt.selector.wakeup();
}

public void register(SelectableChannel channel, int ops)
throws Exception {

if (channel == null) {
return;
}

DispatchThread st = null;
int pointer = dispatcherPointer;

// round-robin approach
st = (DispatchThread) dispatcherList.get(pointer);
if (st.socketCount <= (SocketChannelDemo.MAX_SOCKETCOUNT))
st.toRegister(channel, ops);
else {
pointer++;
System.out.println("move pointer:"+pointer);
if (pointer >= SocketChannelDemo.MAX_SELECTOR)
pointer = 0;

if (pointer >= dispatcherList.size()) {
System.out.println("new Dispath thread");
st = new DispatchThread();
st.start();
dispatcherList.add(st);
} else {
st = (DispatchThread) dispatcherList.get(pointer);
}
dispatcherPointer = pointer;
st.toRegister(channel, ops);
}

}

class DispatchThread extends Thread {
private Selector selector;
public int socketCount = 0;

private List registryPool = Collections.synchronizedList(new LinkedList());
private List tempPool = Collections.synchronizedList(new LinkedList());

public DispatchThread() {

try {
selector = Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void addChannel() {
if (registryPool.isEmpty())
return;

tempPool.clear();
synchronized (registryPool) {
tempPool.addAll(registryPool);
registryPool.clear();
}

for (Iterator it = tempPool.iterator(); it.hasNext();) {
try {
SelectableChannel ch = (SelectableChannel) it.next();
if (ch.isOpen())
ch.register(selector, SelectionKey.OP_READ);
} catch (Exception exp) {
exp.printStackTrace();
continue;
}
it.remove();
}
}

public void run()
{
while (true) {
addChannel();
int n = 0;
try {
n = selector.select();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
//System.out.println("1 -------------");
if (n == 0) {
continue;
}

Iterator it = selector.selectedKeys().iterator();

while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
try {

if (key.isAcceptable()) {

ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel channel = server.accept();
register(channel, SelectionKey.OP_READ);
// doWork(channel);
}
if (key.isReadable()) {
key.interestOps(0);
processData(key, selector);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//System.out.println("2 -------------");
addChannel();
}
}

protected void toRegister(SelectableChannel channel, int ops) {
try {
if (channel == null) {
return;
}

if (channel.isRegistered()) {
//SelectionKey sk = channel.keyFor(selector);
registryPool.add(channel);
selector.wakeup();
} else {
this.socketCount++;
channel.configureBlocking(false);
registryPool.add(channel);
//channel.register(selector, ops);
selector.wakeup();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

protected void processData(SelectionKey key, Selector selector2)
throws Exception {
System.out.println("-------------" + i++);
//if ( (SocketChannel)key.channel()).
SleepTime command = new SleepTime(key, this);
//command.start();
exec.execute(command);
}

}

}

// Runnable run = new Runnable() {
class SleepTime extends Thread {
private static String setupResponse1 = "RTSP/1.0 200 OK/r/nCSeq:220/r/n";

private static String setupResponse2 = "/r/nVersion:1.9/r/n/r/n";

private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

private SelectionKey key;

private SocketChannelDemo.DispatchThread disp;

// private static Logger log = Logger.getLogger(SleepTime.class);
public SleepTime(SelectionKey key1, SocketChannelDemo.DispatchThread disp1) {
key = key1;
disp = disp1;
}

public void run() {

SocketChannel socketChannel = (SocketChannel) key.channel();
int count = 0;

buffer.clear();

try {
while ((count = socketChannel.read(buffer)) > 0) {
//buffer.flip();

int position = buffer.position();

byte[] data = new byte[position];

buffer.clear();

buffer.get(data, 0, data.length);

StringBuffer sbBuff = new StringBuffer(new String(data));

//System.out.print(sbBuff.toString());
//handle request
StringBuffer sb = new StringBuffer();
if (sbBuff.toString().indexOf("SETUP") > -1) {
sb.append(setupResponse1);
synchronized (SocketChannelDemo.sessionId) {
SocketChannelDemo.sessionId = Long
.valueOf(SocketChannelDemo.sessionId
.longValue() + 1);
}

sb.append("Session:" + SocketChannelDemo.sessionId);
sb.append(setupResponse2);
} else if (sbBuff.toString().indexOf("Session:") > -1) {
int begin = sbBuff.toString().indexOf("Session:");
sb.append(setupResponse1);
//find out the sessionId.
sb.append(sbBuff.toString().substring(
begin,
begin
+ sbBuff.toString().substring(begin)
.indexOf("/r/n")));
sb.append(setupResponse2);
}

// send the data, don′t assume it goes all at once
buffer.clear();
buffer.put(sb.toString().getBytes());
buffer.flip();
socketChannel.write(buffer);
// System.out.println("/r/nWrite:/r/n"+sb.toString());
buffer.clear();
disp.toRegister(socketChannel, SelectionKey.OP_READ);
}

if (count < 0) {

//System.out.println("*************Socket closed");
disp.socketCount--;
socketChannel.close();
}

} catch (IOException e) {
// TODO Auto-generated catch block
// e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: