Hadoop源码分析12: IPC流程(7)容器
2014-05-28 08:47
453 查看
1.Server的 callQueue
典型的生产者-消费者模式
public abstractclass Server {
BlockingQueue<ServerCall> callQueue;
}
publicclass ServerConnection {
privatevoid processData(byte[]buf)
throws IOException,
InterruptedException{
DataInputStreamdis
= new DataInputStream(newByteArrayInputStream(buf));
int id
=dis.readInt(); // try to read an id
Writable
param =ReflectionUtils.newInstance(server.paramClass,
server.conf);//read
param
param.readFields(dis);
ServerCall
call =new ServerCall(id, param, this);
server.callQueue.put(call);//
queue the call; maybe blocked here
rpcCount++;
//Increment the rpc count
}
}
publicclass ServerHandler extends Thread {
publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take();
// pop thequeue; maybe blocked here
.........................
}
}
}
2.Server的 connectionList
public abstractclass Server {
List<ServerConnection> connectionList =Collections
.synchronizedList(newLinkedList<ServerConnection>());
void closeConnection(ServerConnectionconnection)
{
synchronized(connectionList)
{
if(connectionList.remove(connection))
numConnections--;
}
try {
connection.close();
} catch(IOException
e) {
}
}
}
publicclass ServerListener extends Thread {
void doAccept(SelectionKeykey)
throws IOException, OutOfMemoryError {
ServerConnectionc
= null;
ServerSocketChannelserverSocketChannel
= (ServerSocketChannel) key
.channel();
SocketChannelchannel;
while
((channel =serverSocketChannel.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader
reader =readers[(currentReader + 1) % readers.length];
try
{
reader.startAdd();
SelectionKeyreadKey
= reader.registerChannel(channel);
c
= newServerConnection(readKey, channel,
System.currentTimeMillis(),this.server);
readKey.attach(c);
synchronized(this.server.connectionList)
{
this.server.connectionList.add(this.server.numConnections,c);
this.server.numConnections++;
}
}
finally{
reader.finishAdd();
}
}
}
private void cleanupConnections(booleanforce)
{
。。。。
while
(i<= end) {
ServerConnectionc;
synchronized(server.connectionList)
{
try
{
c= server.connectionList.get(i);
}
catch(Exception e) {
return;
}
}
if
( c.rpcCount== 0 && currentTime - c.lastContact >server.maxIdleTime ) {
server.closeConnection(c);
numNuked++;
end--;
c
=null;
if
(!force&& numNuked == server.maxConnectionsToNuke)
break;
}
else
i++;
}
。。。。
}
}
3.ServerConnection的 responseQueue
publicclass ServerConnection {
LinkedList<ServerCall> responseQueue;
}
publicclass ServerResponder extends Thread {
private void doAsyncWrite(SelectionKeykey)
throws IOException {
ServerCall call =(ServerCall)key.attachment();
if (call == null){
return;
}
if (key.channel() !=call.connection.channel) {
throw newIOException("doAsyncWrite: bad channel");
}
synchronized(call.connection.responseQueue) {
if(processResponse(call.connection.responseQueue, false)){
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
}
}
}
}
privatevoid doPurge(ServerCallcall,
long now) throws IOException {
LinkedList<ServerCall> responseQueue= call.connection.responseQueue;
synchronized (responseQueue){
Iterator<ServerCall> iter= responseQueue.listIterator(0);
while(iter.hasNext()) {
call = iter.next();
if (now
> call.timestamp +PURGE_INTERVAL) {
server.closeConnection(call.connection);
break;
}
}
}
}
void doRespond(ServerCallcall)
throws IOException {
synchronized(call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if(call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue,true);
}
}
}
privatebooleanprocessResponse(LinkedList<ServerCall>responseQueue,
boolean inHandler)
throwsIOException {
boolean error =true;
boolean done = false; // thereis more data for this channel.
int numElements =0;
ServerCall call =null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, thenwe are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
//
call = responseQueue.removeFirst();
SocketChannel channel =call.connection.channel;
//
// Send as much data as we can in thenon-blocking fashion
//
int numBytes = server.channelWrite(channel,call.response);
if (numBytes
< 0) {
return true;
}
if (!call.response.hasRemaining()) {
call.connection.rpcCount--;
if (numElements == 1) { // last call fullyprocesses.
done =true; // no moredata for this channel.
} else {
done =false; // morecalls pending to be sent.
}
} else {
//
// If we were unable to writethe entire response out, then
// insert in Selectorqueue.
//
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// set theserve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
incPending();
try{
// Wakeup the thread blocked on select, onlythen can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector,SelectionKey.OP_WRITE, call);
} catch(ClosedChannelException e) {
//Its ok. channel might be closed elsewhere.
done = true;
} finally{
decPending();
}
}
}
error = false; //everything went off well
}
} finally {
if (error&& call != null) {
done = true; // error. no more data for thischannel.
server.closeConnection(call.connection);
}
}
return done;
}
}
publicclass ServerHandler extends Thread {
publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take(); // pop the
//queue;
//maybe
//blocked
//here
String
errorClass= null;
String
error =null;
Writable
value =null;
Server.CurCall.set(call);
try
{
value
=server.call(call.connection.protocol,
call.param,call.timestamp);
}
}
catch(Throwable e) {
errorClass
=e.getClass().getName();
error
=StringUtils.stringifyException(e);
}
Server.CurCall.set(null);
synchronized(call.connection.responseQueue)
{
//setupResponse()
needs to be sync'ed together with
//responder.doResponse()
since setupResponse may use
//
SASL toencrypt response data and SASL enforces
//
its ownmessage ordering.
server.setupResponse(buf,call,
(error
== null) ?Status.SUCCESS : Status.ERROR,
value,errorClass,
error);
//
Discard thelarge buf and reset it back to
//
smaller sizeto freeup heap
if
(buf.size()> server.maxRespSize) {
buf
= newByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
}
server.responder.doRespond(call);
}
}
catch(InterruptedException e) {
if(server.running)
{ // unexpected -- log it
}
}
catch(Exception e) {
}
}
}
}
典型的生产者-消费者模式
public abstractclass Server {
BlockingQueue<ServerCall> callQueue;
}
publicclass ServerConnection {
privatevoid processData(byte[]buf)
throws IOException,
InterruptedException{
DataInputStreamdis
= new DataInputStream(newByteArrayInputStream(buf));
int id
=dis.readInt(); // try to read an id
Writable
param =ReflectionUtils.newInstance(server.paramClass,
server.conf);//read
param
param.readFields(dis);
ServerCall
call =new ServerCall(id, param, this);
server.callQueue.put(call);//
queue the call; maybe blocked here
rpcCount++;
//Increment the rpc count
}
}
publicclass ServerHandler extends Thread {
publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take();
// pop thequeue; maybe blocked here
.........................
}
}
}
2.Server的 connectionList
public abstractclass Server {
List<ServerConnection> connectionList =Collections
.synchronizedList(newLinkedList<ServerConnection>());
void closeConnection(ServerConnectionconnection)
{
synchronized(connectionList)
{
if(connectionList.remove(connection))
numConnections--;
}
try {
connection.close();
} catch(IOException
e) {
}
}
}
publicclass ServerListener extends Thread {
void doAccept(SelectionKeykey)
throws IOException, OutOfMemoryError {
ServerConnectionc
= null;
ServerSocketChannelserverSocketChannel
= (ServerSocketChannel) key
.channel();
SocketChannelchannel;
while
((channel =serverSocketChannel.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader
reader =readers[(currentReader + 1) % readers.length];
try
{
reader.startAdd();
SelectionKeyreadKey
= reader.registerChannel(channel);
c
= newServerConnection(readKey, channel,
System.currentTimeMillis(),this.server);
readKey.attach(c);
synchronized(this.server.connectionList)
{
this.server.connectionList.add(this.server.numConnections,c);
this.server.numConnections++;
}
}
finally{
reader.finishAdd();
}
}
}
private void cleanupConnections(booleanforce)
{
。。。。
while
(i<= end) {
ServerConnectionc;
synchronized(server.connectionList)
{
try
{
c= server.connectionList.get(i);
}
catch(Exception e) {
return;
}
}
if
( c.rpcCount== 0 && currentTime - c.lastContact >server.maxIdleTime ) {
server.closeConnection(c);
numNuked++;
end--;
c
=null;
if
(!force&& numNuked == server.maxConnectionsToNuke)
break;
}
else
i++;
}
。。。。
}
}
3.ServerConnection的 responseQueue
publicclass ServerConnection {
LinkedList<ServerCall> responseQueue;
}
publicclass ServerResponder extends Thread {
private void doAsyncWrite(SelectionKeykey)
throws IOException {
ServerCall call =(ServerCall)key.attachment();
if (call == null){
return;
}
if (key.channel() !=call.connection.channel) {
throw newIOException("doAsyncWrite: bad channel");
}
synchronized(call.connection.responseQueue) {
if(processResponse(call.connection.responseQueue, false)){
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
}
}
}
}
privatevoid doPurge(ServerCallcall,
long now) throws IOException {
LinkedList<ServerCall> responseQueue= call.connection.responseQueue;
synchronized (responseQueue){
Iterator<ServerCall> iter= responseQueue.listIterator(0);
while(iter.hasNext()) {
call = iter.next();
if (now
> call.timestamp +PURGE_INTERVAL) {
server.closeConnection(call.connection);
break;
}
}
}
}
void doRespond(ServerCallcall)
throws IOException {
synchronized(call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if(call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue,true);
}
}
}
privatebooleanprocessResponse(LinkedList<ServerCall>responseQueue,
boolean inHandler)
throwsIOException {
boolean error =true;
boolean done = false; // thereis more data for this channel.
int numElements =0;
ServerCall call =null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, thenwe are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
//
call = responseQueue.removeFirst();
SocketChannel channel =call.connection.channel;
//
// Send as much data as we can in thenon-blocking fashion
//
int numBytes = server.channelWrite(channel,call.response);
if (numBytes
< 0) {
return true;
}
if (!call.response.hasRemaining()) {
call.connection.rpcCount--;
if (numElements == 1) { // last call fullyprocesses.
done =true; // no moredata for this channel.
} else {
done =false; // morecalls pending to be sent.
}
} else {
//
// If we were unable to writethe entire response out, then
// insert in Selectorqueue.
//
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// set theserve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
incPending();
try{
// Wakeup the thread blocked on select, onlythen can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector,SelectionKey.OP_WRITE, call);
} catch(ClosedChannelException e) {
//Its ok. channel might be closed elsewhere.
done = true;
} finally{
decPending();
}
}
}
error = false; //everything went off well
}
} finally {
if (error&& call != null) {
done = true; // error. no more data for thischannel.
server.closeConnection(call.connection);
}
}
return done;
}
}
publicclass ServerHandler extends Thread {
publicvoid run(){
Server.SERVER.set(server);
ByteArrayOutputStream
buf =new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running)
{
try
{
final
ServerCallcall = server.callQueue.take(); // pop the
//queue;
//maybe
//blocked
//here
String
errorClass= null;
String
error =null;
Writable
value =null;
Server.CurCall.set(call);
try
{
value
=server.call(call.connection.protocol,
call.param,call.timestamp);
}
}
catch(Throwable e) {
errorClass
=e.getClass().getName();
error
=StringUtils.stringifyException(e);
}
Server.CurCall.set(null);
synchronized(call.connection.responseQueue)
{
//setupResponse()
needs to be sync'ed together with
//responder.doResponse()
since setupResponse may use
//
SASL toencrypt response data and SASL enforces
//
its ownmessage ordering.
server.setupResponse(buf,call,
(error
== null) ?Status.SUCCESS : Status.ERROR,
value,errorClass,
error);
//
Discard thelarge buf and reset it back to
//
smaller sizeto freeup heap
if
(buf.size()> server.maxRespSize) {
buf
= newByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
}
server.responder.doRespond(call);
}
}
catch(InterruptedException e) {
if(server.running)
{ // unexpected -- log it
}
}
catch(Exception e) {
}
}
}
}
相关文章推荐
- Hadoop源码分析16: IPC流程(11) 整体流程
- Hadoop源码分析10: IPC流程(5) Atomic
- Hadoop源码分析14: IPC流程(9) SelectionKey
- Hadoop源码分析7: IPC流程(2) 流程
- Hadoop源码分析9:IPC流程(4) Client 的 wait() 和 notify()
- Hadoop源码分析13: IPC流程(8) Server的wait、notify
- Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用
- Hadoop源码分析11: IPC流程(6)volatile
- Hadoop源码分析7: IPC流程(1) 主要类
- Hadoop源码分析17:IPC中的ThreadLocal
- Hadoop源码分析之IPC服务端连接建立与方法调用
- Hadoop0.21.0源码流程分析(3)-Task节点管理启动任务
- Hadoop 中 IPC 的源码分析
- hadoop源码 - HDFS write流程分析(Hadoop2.0)
- Hadoop源码分析24 JobTracker启动和心跳处理流程
- Hadoop源码流程分析4-Task节点执行任务
- Hadoop源码分析之IPC中Server端的初始化与启动
- Hadoop源码分析笔记(八):HDFS主要流程
- Hadoop 中 IPC 的源码分析
- Hadoop0.21.0源码流程分析(1)-客户端提交作业