您的位置:首页 > 运维架构

Hadoop源码分析12: IPC流程(7)容器

2014-05-28 08:47 453 查看
1.Server的 callQueue

典型的生产者-消费者模式

public abstractclass Server {
BlockingQueue<ServerCall> callQueue;

}

publicclass ServerConnection {

privatevoid processData(byte[]buf)
throws IOException,
InterruptedException{
DataInputStreamdis
= new DataInputStream(newByteArrayInputStream(buf));
int id
=dis.readInt(); // try to read an id

Writable
param =ReflectionUtils.newInstance(server.paramClass,
server.conf);//read
param
param.readFields(dis);

ServerCall
call =new ServerCall(id, param, this);
server.callQueue.put(call);//
queue the call; maybe blocked here
rpcCount++;
//Increment the rpc count
}
}

publicclass ServerHandler extends Thread {

publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take();
// pop thequeue; maybe blocked here
.........................
}
}
}

2.Server的 connectionList

public abstractclass Server {
List<ServerConnection> connectionList =Collections
.synchronizedList(newLinkedList<ServerConnection>());

void closeConnection(ServerConnectionconnection)
{
synchronized(connectionList)
{
if(connectionList.remove(connection))
numConnections--;
}
try {
connection.close();
} catch(IOException
e) {
}
}
}

publicclass ServerListener extends Thread {

void doAccept(SelectionKeykey)
throws IOException, OutOfMemoryError {
ServerConnectionc
= null;
ServerSocketChannelserverSocketChannel
= (ServerSocketChannel) key
.channel();
SocketChannelchannel;
while
((channel =serverSocketChannel.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader
reader =readers[(currentReader + 1) % readers.length];
try
{
reader.startAdd();
SelectionKeyreadKey
= reader.registerChannel(channel);
c
= newServerConnection(readKey, channel,
System.currentTimeMillis(),this.server);
readKey.attach(c);
synchronized(this.server.connectionList)
{
this.server.connectionList.add(this.server.numConnections,c);
this.server.numConnections++;
}

}
finally{
reader.finishAdd();
}

}
}

private void cleanupConnections(booleanforce)
{

。。。。
while
(i<= end) {
ServerConnectionc;
synchronized(server.connectionList)
{
try
{
c= server.connectionList.get(i);
}
catch(Exception e) {
return;
}
}
if
( c.rpcCount== 0 && currentTime - c.lastContact >server.maxIdleTime ) {
server.closeConnection(c);
numNuked++;
end--;
c
=null;
if
(!force&& numNuked == server.maxConnectionsToNuke)
break;
}
else
i++;
}

。。。。
}
}

3.ServerConnection的 responseQueue

publicclass ServerConnection {

LinkedList<ServerCall> responseQueue;

}

publicclass ServerResponder extends Thread {

private void doAsyncWrite(SelectionKeykey)
throws IOException {
ServerCall call =(ServerCall)key.attachment();
if (call == null){
return;
}
if (key.channel() !=call.connection.channel) {
throw newIOException("doAsyncWrite: bad channel");
}

synchronized(call.connection.responseQueue) {
if(processResponse(call.connection.responseQueue, false)){
try {
key.interestOps(0);
} catch (CancelledKeyException e) {

}
}
}
}

privatevoid doPurge(ServerCallcall,
long now) throws IOException {
LinkedList<ServerCall> responseQueue= call.connection.responseQueue;
synchronized (responseQueue){
Iterator<ServerCall> iter= responseQueue.listIterator(0);
while(iter.hasNext()) {
call = iter.next();
if (now
> call.timestamp +PURGE_INTERVAL) {
server.closeConnection(call.connection);
break;
}
}
}
}

void doRespond(ServerCallcall)
throws IOException {
synchronized(call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if(call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue,true);
}
}
}
privatebooleanprocessResponse(LinkedList<ServerCall>responseQueue,
boolean inHandler)
throwsIOException {
boolean error =true;
boolean done = false; // thereis more data for this channel.
int numElements =0;
ServerCall call =null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, thenwe are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
//
call = responseQueue.removeFirst();
SocketChannel channel =call.connection.channel;

//
// Send as much data as we can in thenon-blocking fashion
//
int numBytes = server.channelWrite(channel,call.response);
if (numBytes
< 0) {
return true;
}
if (!call.response.hasRemaining()) {
call.connection.rpcCount--;
if (numElements == 1) { // last call fullyprocesses.
done =true; // no moredata for this channel.
} else {
done =false; // morecalls pending to be sent.
}

} else {
//
// If we were unable to writethe entire response out, then
// insert in Selectorqueue.
//
call.connection.responseQueue.addFirst(call);

if (inHandler) {
// set theserve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();

incPending();
try{
// Wakeup the thread blocked on select, onlythen can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector,SelectionKey.OP_WRITE, call);
} catch(ClosedChannelException e) {
//Its ok. channel might be closed elsewhere.
done = true;
} finally{
decPending();
}
}
}
error = false; //everything went off well
}
} finally {
if (error&& call != null) {
done = true; // error. no more data for thischannel.
server.closeConnection(call.connection);
}
}
return done;
}
}

publicclass ServerHandler extends Thread {
publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take(); // pop the
//queue;
//maybe
//blocked
//here

String
errorClass= null;
String
error =null;
Writable
value =null;

Server.CurCall.set(call);
try
{
 value
=server.call(call.connection.protocol,
call.param,call.timestamp);
 
}
}
catch(Throwable e) {
errorClass
=e.getClass().getName();
error
=StringUtils.stringifyException(e);
}
Server.CurCall.set(null);
synchronized(call.connection.responseQueue)
{
//setupResponse()
needs to be sync'ed together with
//responder.doResponse()
since setupResponse may use
//
SASL toencrypt response data and SASL enforces
//
its ownmessage ordering.
server.setupResponse(buf,call,
(error
== null) ?Status.SUCCESS : Status.ERROR,
value,errorClass,
error);
//
Discard thelarge buf and reset it back to
//
smaller sizeto freeup heap
if
(buf.size()> server.maxRespSize) {
buf
= newByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
}
server.responder.doRespond(call);
}
}
catch(InterruptedException e) {
if(server.running)
{ // unexpected -- log it

}
}
catch(Exception e) {

}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: