您的位置:首页 > Web前端 > Node.js

HDFS Namenode接收RPC请求过程

2013-04-09 16:34 295 查看
HDFS中所有控制消息通过RPC发送(数据消息直接使用Socket),所以让我们看看它的Under the hood.

对于Namenode,它的main函数位于org.apache.hadoop.hdfs.server.namenode.Namenode,而元数据的实际管理主要由FSNamesystem类完成,Namenode类则负责与外界的IPC通信以及一些配置工作。

Namenode有个NameNodeRpcServer的成员,专门负责处理RPC请求。在NameNodeRpcServer类中又有两个RPC.Server的成员变量,serviceRpcServer和clientRpcServer,分别接收来自Datanode和Client的请求。

源码中和RPC相关的类主要有几个:

org.apache.hadoop.ipc.Server:RPC服务在java.net和java.nio上的实现,封装了几个内部类如Call, Listener, Responder, Connection, Handler,具体见后文。

org.apache.hadoop.ipc.RPC: RPC服务的建立调用等,其中内部类Server继承了org.apache.hadoop.ipc.Server.

org.apache.hadoop.ipc.ProtobufRpcEngine和WritableRpcEngine分别是RPCEngine的protobuf和writable实现,继承了org.apache.hadoop.ipc.RPC.Server.

追踪一下clientRpcServer服务器的启动、运行过程:

public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
这里显然是responder回复RPC消息,listener接收RPC消息,handler负责处理消息。

具体看下listener,listener有多个Reader线程,每个Reader执行doRunLoop()主循环,遍历SelectionKeys,把可读的进行doRead(key):

Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}


doRead继续调用readAndProcess(),在读取完整个包(报头、Auth信息及其初始化、正文)之后调用processOneRpc(buf), 再调processData(buf)进行反序列化。

在processData(buf)中,通过Rpc报头获得一个Writable的实例:

Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
...
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);

在此处,实例是 org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestWrapper,在这里读取参数:

public void readFields(java.io.DataInput in) throws java.io.IOException { /* compiled code */ }


返回processData,有以下:

Call call = new Call(header.getCallId(), rpcRequest, this,
ProtoUtil.convert(header.getRpcKind()));
callQueue.put(call);              // queue the call; maybe blocked here
把整个RPC请求包装成一个Call,入队。

在Handler中主循环取出Call以后有:

// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);

}
}
);
}
call继续调用getRpcInvoker获取RpcInvoker并call,这里是ProtoBufRpcInvoker,其call函数有:

ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
...
result = service.callBlockingMethod(methodDescriptor, null, param);
调用org.apache.hadoop.hdfs.protocol.proto,ClientNamenodeProtocolProtos.ClientNamenodeProtocol一个匿名类中的callBlockingMethod:

...
switch(method.getIndex()) {
case 0:
return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);
通过MethodDescriptor获取方法,调用impl的对应方法,此处impl即ClientNamenodeProtocolServerSideTranslatorPB,在NamenodeRpcServer初始化的时候将其注册为Translator。在ClientNamenodeProtocolServerSideTranslatorPB中有如下:

public GetBlockLocationsResponseProto getBlockLocations(
RpcController controller, GetBlockLocationsRequestProto req)
throws ServiceException {
try {
LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
req.getLength());
Builder builder = GetBlockLocationsResponseProto
.newBuilder();
if (b != null) {
builder.setLocations(PBHelper.convert(b)).build();
}
return builder.build();
...
通过req获取各个参数以后调用server的方法,server即本文开始处的NamenodeRpcServer,一切Namenode的RPC请求最终都提交到此。

总结下,我们了解了一个RPC请求从java的TCP层到NamenodeRpcServer的调用过程,因为要做转发请求的工作,请求的回复以及请求的发起方式都是接下来要继续了解的内容。

EOF
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: