HDFS Namenode接收RPC请求过程
2013-04-09 16:34
295 查看
HDFS中所有控制消息通过RPC发送(数据消息直接使用Socket),所以让我们看看它的Under the hood.
对于Namenode,它的main函数位于org.apache.hadoop.hdfs.server.namenode.Namenode,而元数据的实际管理主要由FSNamesystem类完成,Namenode类则负责与外界的IPC通信以及一些配置工作。
Namenode有个NameNodeRpcServer的成员,专门负责处理RPC请求。在NameNodeRpcServer类中又有两个RPC.Server的成员变量,serviceRpcServer和clientRpcServer,分别接收来自Datanode和Client的请求。
源码中和RPC相关的类主要有几个:
org.apache.hadoop.ipc.Server:RPC服务在java.net和java.nio上的实现,封装了几个内部类如Call, Listener, Responder, Connection, Handler,具体见后文。
org.apache.hadoop.ipc.RPC: RPC服务的建立调用等,其中内部类Server继承了org.apache.hadoop.ipc.Server.
org.apache.hadoop.ipc.ProtobufRpcEngine和WritableRpcEngine分别是RPCEngine的protobuf和writable实现,继承了org.apache.hadoop.ipc.RPC.Server.
追踪一下clientRpcServer服务器的启动、运行过程:
具体看下listener,listener有多个Reader线程,每个Reader执行doRunLoop()主循环,遍历SelectionKeys,把可读的进行doRead(key):
doRead继续调用readAndProcess(),在读取完整个包(报头、Auth信息及其初始化、正文)之后调用processOneRpc(buf), 再调processData(buf)进行反序列化。
在processData(buf)中,通过Rpc报头获得一个Writable的实例:
在此处,实例是 org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestWrapper,在这里读取参数:
返回processData,有以下:
在Handler中主循环取出Call以后有:
总结下,我们了解了一个RPC请求从java的TCP层到NamenodeRpcServer的调用过程,因为要做转发请求的工作,请求的回复以及请求的发起方式都是接下来要继续了解的内容。
EOF
对于Namenode,它的main函数位于org.apache.hadoop.hdfs.server.namenode.Namenode,而元数据的实际管理主要由FSNamesystem类完成,Namenode类则负责与外界的IPC通信以及一些配置工作。
Namenode有个NameNodeRpcServer的成员,专门负责处理RPC请求。在NameNodeRpcServer类中又有两个RPC.Server的成员变量,serviceRpcServer和clientRpcServer,分别接收来自Datanode和Client的请求。
源码中和RPC相关的类主要有几个:
org.apache.hadoop.ipc.Server:RPC服务在java.net和java.nio上的实现,封装了几个内部类如Call, Listener, Responder, Connection, Handler,具体见后文。
org.apache.hadoop.ipc.RPC: RPC服务的建立调用等,其中内部类Server继承了org.apache.hadoop.ipc.Server.
org.apache.hadoop.ipc.ProtobufRpcEngine和WritableRpcEngine分别是RPCEngine的protobuf和writable实现,继承了org.apache.hadoop.ipc.RPC.Server.
追踪一下clientRpcServer服务器的启动、运行过程:
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }这里显然是responder回复RPC消息,listener接收RPC消息,handler负责处理消息。
具体看下listener,listener有多个Reader线程,每个Reader执行doRunLoop()主循环,遍历SelectionKeys,把可读的进行doRead(key):
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; }
doRead继续调用readAndProcess(),在读取完整个包(报头、Auth信息及其初始化、正文)之后调用processOneRpc(buf), 再调processData(buf)进行反序列化。
在processData(buf)中,通过Rpc报头获得一个Writable的实例:
Class<? extends Writable> rpcRequestClass = getRpcRequestWrapper(header.getRpcKind()); ... rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest.readFields(dis);
在此处,实例是 org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestWrapper,在这里读取参数:
public void readFields(java.io.DataInput in) throws java.io.IOException { /* compiled code */ }
返回processData,有以下:
Call call = new Call(header.getCallId(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind())); callQueue.put(call); // queue the call; maybe blocked here把整个RPC请求包装成一个Call,入队。
在Handler中主循环取出Call以后有:
// Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } else { value = call.connection.user.doAs (new PrivilegedExceptionAction<Writable>() { @Override public Writable run() throws Exception { // make the call return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } } ); }call继续调用getRpcInvoker获取RpcInvoker并call,这里是ProtoBufRpcInvoker,其call函数有:
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, declaringClassProtoName, clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; MethodDescriptor methodDescriptor = service.getDescriptorForType() .findMethodByName(methodName); ... result = service.callBlockingMethod(methodDescriptor, null, param);调用org.apache.hadoop.hdfs.protocol.proto,ClientNamenodeProtocolProtos.ClientNamenodeProtocol一个匿名类中的callBlockingMethod:
... switch(method.getIndex()) { case 0: return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);通过MethodDescriptor获取方法,调用impl的对应方法,此处impl即ClientNamenodeProtocolServerSideTranslatorPB,在NamenodeRpcServer初始化的时候将其注册为Translator。在ClientNamenodeProtocolServerSideTranslatorPB中有如下:
public GetBlockLocationsResponseProto getBlockLocations( RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException { try { LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), req.getLength()); Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { builder.setLocations(PBHelper.convert(b)).build(); } return builder.build(); ...通过req获取各个参数以后调用server的方法,server即本文开始处的NamenodeRpcServer,一切Namenode的RPC请求最终都提交到此。
总结下,我们了解了一个RPC请求从java的TCP层到NamenodeRpcServer的调用过程,因为要做转发请求的工作,请求的回复以及请求的发起方式都是接下来要继续了解的内容。
EOF
相关文章推荐
- 怎么查看真实项目的http 请求的请求报文和响应报文,即request和response?只有这样,才能完全彻底明白一个http 请求整个过程,发送和接收的是什么东西。
- HDFS之NameNode的启动过程分析
- rpc,客户端与NameNode通信的过程
- spring mvc DispatcherServlet 接收请求到响应数据的过程
- java发送HttpClient请求及接收请求结果过程的简单实例
- JAVA发送HttpClient请求及接收请求结果过程
- RPC请求处理过程
- JAVA发送HttpClient请求及接收请求结果过程
- JAVA发送HttpClient请求及接收请求结果过程
- [蓝牙功耗]通话过程中来蓝牙传输请求,手机拒绝接收,挂掉电话后,手机不能进休眠
- Redis源码系列28:ServerSocket接收到client的连接请求处理过程
- JAVA发送HttpClient请求及接收请求结果过程
- NFS客户端RPC请求封装过程
- NameNode 接收请求
- JAVA发送HttpClient请求及接收请求结果过程
- HDFS源码分析之NameNode(1)————启动过程
- HDFS Namenode启动过程
- Hadoop详解(二)——HDFS的命令,执行过程,Java接口,原理详解。RPC机制
- RPC和HDFS文件读写(下载上传)过程
- HDFS Namenode启动过程