Hadoop源码分析14: IPC流程(9) SelectionKey
2014-05-28 08:47
381 查看
1.SelectionKey.OP_ACCEPT
publicclassServerListenerextendsThread{
publicServerListener(Serverserver)
throwsIOException{
this.server=server;
address=newInetSocketAddress(server.bindAddress,server.port);
// Create anew server socket and set to non blocking
mode
acceptChannel=
ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind theserver socket to the local host and port
Server.bind(acceptChannel.socket(),address,
backlogLength);
server.port=acceptChannel.socket().getLocalPort();
// create aselector;
selector =Selector.open();
readers=newServerListenerReader[server.readThreads];
readPool=Executors.newFixedThreadPool(server.readThreads);
for(inti
= 0;i <server.readThreads;i++)
{
Selector readSelector = Selector.open();
ServerListenerReader reader =
newServerListenerReader(
readSelector,
this);
readers[i]=
reader;
readPool.execute(reader);
}
acceptChannel.register(selector,SelectionKey.OP_ACCEPT);
this.setName("IPCServer
listener on " +server.port);
this.setDaemon(true);
}
}
2.
SelectionKey.OP_READ
public
classServerListenerextends
Thread{
voiddoAccept(SelectionKeykey)
throwsIOException,OutOfMemoryError {
ServerConnection c =
null;
ServerSocketChannel
serverSocketChannel=(ServerSocketChannel)key.channel();
SocketChannel channel;
while((channel=serverSocketChannel.accept())!=
null){
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader reader =
readers[(currentReader+
1) %readers.length];
try{
reader.startAdd();
SelectionKey
readKey =
channel.register(reader.readSelector,SelectionKey.OP_READ);
c =
newServerConnection(readKey,channel,
System.currentTimeMillis(),
this.server);
readKey.attach(c);
synchronized(this.server.connectionList){
this.server.connectionList.add(this.server.numConnections,
c);
this.server.numConnections++;
}
}
finally{
reader.finishAdd();
}
}
}
}
3. SelectionKey.OP_WRITE
publicclassServerResponderextendsThread{
privatebooleanprocessResponse(LinkedList<Call>responseQueue,
booleaninHandler)throwsIOException{
booleanerror=
true;
booleandone =false;
intnumElements =0;
Call call =
null;
try{
synchronized(responseQueue){
numElements = responseQueue.size();
if(numElements==
0) {
error =
false;
returntrue; //
no moredata for this channel.
}
call = responseQueue.removeFirst();
SocketChannelchannel = call.connection.channel;
intnumBytes
=channelWrite(channel, call.response);
if(numBytes<0){
returntrue;
}
if(!call.response.hasRemaining()){
call.connection.decRpcCount();
if(numElements==
1){ // last callfully processes.
done =
true; //
no moredata for this channel.
}
else{
done =
false; //
morecalls pending to be sent.
}
}
else{
call.connection.responseQueue.addFirst(call);
if(inHandler){
// set theserve time when the response has to be sent
later
call.timestamp=System.currentTimeMillis();
incPending();
try{
writeSelector.wakeup();
channel.register(writeSelector,
SelectionKey.OP_WRITE,call);
}
catch(ClosedChannelExceptione) {
//Its ok.channel might be closed else where.
done =
true;
}
finally{
decPending();
}
}
}
error =
false; //everything
went off well
}
}
finally{
if(error&& call
!= null){
done =
true; //
error. nomore data for this channel.
closeConnection(call.connection);
}
}
returndone;
}
}
publicclassServerListenerextendsThread{
publicServerListener(Serverserver)
throwsIOException{
this.server=server;
address=newInetSocketAddress(server.bindAddress,server.port);
// Create anew server socket and set to non blocking
mode
acceptChannel=
ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind theserver socket to the local host and port
Server.bind(acceptChannel.socket(),address,
backlogLength);
server.port=acceptChannel.socket().getLocalPort();
// create aselector;
selector =Selector.open();
readers=newServerListenerReader[server.readThreads];
readPool=Executors.newFixedThreadPool(server.readThreads);
for(inti
= 0;i <server.readThreads;i++)
{
Selector readSelector = Selector.open();
ServerListenerReader reader =
newServerListenerReader(
readSelector,
this);
readers[i]=
reader;
readPool.execute(reader);
}
acceptChannel.register(selector,SelectionKey.OP_ACCEPT);
this.setName("IPCServer
listener on " +server.port);
this.setDaemon(true);
}
}
2.
SelectionKey.OP_READ
public
classServerListenerextends
Thread{
voiddoAccept(SelectionKeykey)
throwsIOException,OutOfMemoryError {
ServerConnection c =
null;
ServerSocketChannel
serverSocketChannel=(ServerSocketChannel)key.channel();
SocketChannel channel;
while((channel=serverSocketChannel.accept())!=
null){
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader reader =
readers[(currentReader+
1) %readers.length];
try{
reader.startAdd();
SelectionKey
readKey =
channel.register(reader.readSelector,SelectionKey.OP_READ);
c =
newServerConnection(readKey,channel,
System.currentTimeMillis(),
this.server);
readKey.attach(c);
synchronized(this.server.connectionList){
this.server.connectionList.add(this.server.numConnections,
c);
this.server.numConnections++;
}
}
finally{
reader.finishAdd();
}
}
}
}
3. SelectionKey.OP_WRITE
publicclassServerResponderextendsThread{
privatebooleanprocessResponse(LinkedList<Call>responseQueue,
booleaninHandler)throwsIOException{
booleanerror=
true;
booleandone =false;
intnumElements =0;
Call call =
null;
try{
synchronized(responseQueue){
numElements = responseQueue.size();
if(numElements==
0) {
error =
false;
returntrue; //
no moredata for this channel.
}
call = responseQueue.removeFirst();
SocketChannelchannel = call.connection.channel;
intnumBytes
=channelWrite(channel, call.response);
if(numBytes<0){
returntrue;
}
if(!call.response.hasRemaining()){
call.connection.decRpcCount();
if(numElements==
1){ // last callfully processes.
done =
true; //
no moredata for this channel.
}
else{
done =
false; //
morecalls pending to be sent.
}
}
else{
call.connection.responseQueue.addFirst(call);
if(inHandler){
// set theserve time when the response has to be sent
later
call.timestamp=System.currentTimeMillis();
incPending();
try{
writeSelector.wakeup();
channel.register(writeSelector,
SelectionKey.OP_WRITE,call);
}
catch(ClosedChannelExceptione) {
//Its ok.channel might be closed else where.
done =
true;
}
finally{
decPending();
}
}
}
error =
false; //everything
went off well
}
}
finally{
if(error&& call
!= null){
done =
true; //
error. nomore data for this channel.
closeConnection(call.connection);
}
}
returndone;
}
}
相关文章推荐
- Hadoop源码分析9:IPC流程(4) Client 的 wait() 和 notify()
- Hadoop源码分析10: IPC流程(5) Atomic
- Hadoop源码分析13: IPC流程(8) Server的wait、notify
- Hadoop源码分析7: IPC流程(2) 流程
- Hadoop源码分析12: IPC流程(7)容器
- Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用
- Hadoop源码分析11: IPC流程(6)volatile
- Hadoop源码分析7: IPC流程(1) 主要类
- Hadoop源码分析16: IPC流程(11) 整体流程
- Hadoop源码分析之IPC服务端连接建立与方法调用
- Hadoop0.21.0源码流程分析(3)-Task节点管理启动任务
- Hadoop 中 IPC 的源码分析
- hadoop源码 - HDFS write流程分析(Hadoop2.0)
- Hadoop 中 IPC 的源码分析
- Hadoop源码分析24 JobTracker启动和心跳处理流程
- Hadoop源码流程分析4-Task节点执行任务
- Hadoop源码分析之IPC中Server端的初始化与启动
- Hadoop源码分析笔记(八):HDFS主要流程
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- [hadoop源码阅读][6]-org.apache.hadoop.ipc-protocol和心跳分析