您的位置:首页 > 运维架构

Hadoop源码分析14: IPC流程(9) SelectionKey

2014-05-28 08:47 381 查看
1.SelectionKey.OP_ACCEPT

publicclassServerListenerextendsThread{

publicServerListener(Serverserver)
throwsIOException{

this.server=server;

address=newInetSocketAddress(server.bindAddress,server.port);

// Create anew server socket and set to non blocking
mode

acceptChannel=
ServerSocketChannel.open();

acceptChannel.configureBlocking(false);

// Bind theserver socket to the local host and port

Server.bind(acceptChannel.socket(),address,
backlogLength);

server.port=acceptChannel.socket().getLocalPort();

// create aselector;

selector =Selector.open();

readers=newServerListenerReader[server.readThreads];

readPool=Executors.newFixedThreadPool(server.readThreads);

for(inti
= 0;i <server.readThreads;i++)
{

Selector readSelector = Selector.open();

ServerListenerReader reader =
newServerListenerReader(

readSelector,
this);

readers[i]=
reader;

readPool.execute(reader);

}

acceptChannel.register(selector,SelectionKey.OP_ACCEPT);

this.setName("IPCServer
listener on " +server.port);

this.setDaemon(true);
}

}

2.
SelectionKey.OP_READ


public

class
ServerListenerextends
Thread{

voiddoAccept(SelectionKeykey)
throwsIOException,OutOfMemoryError {

ServerConnection c =
null;

ServerSocketChannel
serverSocketChannel=(ServerSocketChannel)key.channel();

SocketChannel channel;

while((channel=serverSocketChannel.accept())!=
null){

channel.configureBlocking(false);

channel.socket().setTcpNoDelay(this.server.tcpNoDelay);

ServerListenerReader reader =
readers[(currentReader+
1) %readers.length];

try{

reader.startAdd();

SelectionKey
readKey =
channel.register(reader.readSelector,SelectionKey.OP_READ);

c =
newServerConnection(readKey,channel,

System.currentTimeMillis(),
this.server);

readKey.attach(c);

synchronized(this.server.connectionList){

this.server.connectionList.add(this.server.numConnections,

c);

this.server.numConnections++;

}

}
finally{

reader.finishAdd();

}

}

}

}

3. SelectionKey.OP_WRITE

publicclassServerResponderextendsThread{

privatebooleanprocessResponse(LinkedList<Call>responseQueue,

booleaninHandler)throwsIOException{

booleanerror=
true;

booleandone =false;

intnumElements =0;

Call call =
null;

try{

synchronized(responseQueue){

numElements = responseQueue.size();

if(numElements==
0) {

error =
false;

returntrue; //
no moredata for this channel.

}

call = responseQueue.removeFirst();

SocketChannelchannel = call.connection.channel;

intnumBytes
=channelWrite(channel, call.response);

if(numBytes<0){

returntrue;

}

if(!call.response.hasRemaining()){

call.connection.decRpcCount();

if(numElements==
1){ // last callfully processes.

done =
true; //
no moredata for this channel.

}
else{

done =
false; //
morecalls pending to be sent.

}

}
else{

call.connection.responseQueue.addFirst(call);

if(inHandler){

// set theserve time when the response has to be sent
later

call.timestamp=System.currentTimeMillis();

incPending();

try{

writeSelector.wakeup();

channel.register(writeSelector,
SelectionKey.OP_WRITE,call);

}
catch(ClosedChannelExceptione) {

//Its ok.channel might be closed else where.

done =
true;

}
finally{

decPending();

}

}

}

error =
false; //everything
went off well

}

}
finally{

if(error&& call
!= null){

done =
true; //
error. nomore data for this channel.

closeConnection(call.connection);

}

}

returndone;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: