您的位置:首页 > 运维架构

Hadoop源码分析11: IPC流程(6)volatile

2014-05-28 08:47 435 查看
1.Server 的 running

public abstract class Server {

 volatile boolean running =true; // true while server runs

publicsynchronized void
join() throwsInterruptedException {
while (running){
wait();
  }
  }

public synchronized voidstop() {
  running = false;
if (handlers != null) {
for (int i= 0; i < handlerCount; i++) {
if (handlers[i] != null){
handlers[i].interrupt();
}
}
}
listener.interrupt();
listener.doStop();
responder.interrupt();
notifyAll();
}

}

public class ServerHandler extends Thread{

@Override
public void run() {
Server.SERVER.set(server);
ByteArrayOutputStreambuf = new ByteArrayOutputStream(
Server.INITIAL_RESP_BUF_SIZE);
while(server.running) {
try {
final ServerCall call = server.callQueue.take();
............................
} catch(InterruptedException e) {
if (server.running) { // unexpected -- logit
.............
}
} catch (Exception e){
  }
}
}

}

public class ServerListener extends Thread{

public void run(){
Server.SERVER.set(server);
while(server.running) {
SelectionKey key = null;
t try {
。。。。。。

}
}
}
}

public class ServerListenerReader implements Runnable{

public void run() {
synchronized (this) {
while (serverListener.server.running){
SelectionKey key =null;

。。。。。。。。。。。。。。

}
。。。。。
}catch (InterruptedException e) {
if(serverListener.server.running) { // unexpected -- log it
}
} catch (IOException ex){
}
}
}

public class ServerResponder extendsThread{

public void run() {
Server.SERVER.set(server);
longlastPurgeTime = 0;

while (server.running) {
try {
waitPending(); // If a channel is beingregistered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter =writeSelector.selectedKeys().iterator();

}
}
}
}

2.ServerConnection 的 rpcCount

public class ServerConnection {
volatile intrpcCount = 0;

private voidprocessData(byte[] buf) throws IOException,
InterruptedException{
DataInputStream dis = new DataInputStream(newByteArrayInputStream(buf));
int id = dis.readInt(); // try to read anid

Writable param =ReflectionUtils.newInstance(server.paramClass,
server.conf);// read param
param.readFields(dis);

ServerCall call = new ServerCall(id, param,this);
server.callQueue.put(call); // queue the call;maybe blocked here
rpcCount++; // Increment the rpccount
}
}

public class ServerListener extends Thread{

private voidcleanupConnections(boolean force) {
...............
if ( c.rpcCount == 0&& currentTime - c.lastContact > server.maxIdleTime ){
server.closeConnection(c);
numNuked++;
end--;
c =null;
...............
}
}

public class ServerResponder extends Thread{

private booleanprocessResponse(LinkedList<ServerCall> responseQueue,
booleaninHandler) throws IOException {
。。。。。
if(!call.response.hasRemaining()) {
call.connection.rpcCount--;
if(numElements == 1) { // lastcall fully processes.
done = true; // no more data for thischannel.
} else{
done = false; // more calls pending to besent.
}

}
。。。。。
}
}

3. ServerListenerReader的 adding

public class ServerListenerReader implements Runnable{

private volatileboolean adding = false;

public void run(){
synchronized (this) {
while (serverListener.server.running) {
SelectionKey key =null;
try {
readSelector.select();
while(adding) {
this.wait(1000);
}
.......................
}
}

public voidstartAdd() {
adding = true;
readSelector.wakeup();
}

public synchronized voidfinishAdd() {
adding = false;
this.notify();
}
}

public class ServerListener extends Thread{

voiddoAccept(SelectionKey key) throws IOException,OutOfMemoryError {
ServerConnection c = null;
ServerSocketChannelserverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel =serverSocketChannel.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
ServerListenerReader reader =readers[(currentReader + 1) % readers.length];
try {
reader.startAdd();
SelectionKey readKey =reader.registerChannel(channel);
c = newServerConnection(readKey,channel,System.currentTimeMillis(),this.server);
readKey.attach(c);
synchronized(this.server.connectionList) {
this.server.connectionList.add(this.server.numConnections,c);
this.server.numConnections++;
}
} finally{
reader.finishAdd();
}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: