hadoop源码解析之RPC分析
2017-03-02 20:24
387 查看
前言
准备工作
Hadoop rpc实现流程
定义接口
实现接口
启动一个server
构建一个client的代理
执行相应的方法
Server底层实现
内部类介绍
Call
Connection
Handler
Listener
Reader
Responder
Server的启动
接收请求
Reader线程读取数据
Handler线程处理请求
客户端实现
获取代理
发送请求
总结
Hadoop的rpc并没有采用现成的rpc框架,如thrift等,而是采用jdk自带的库完全自己写了一套,更加轻量级,更加可控。
用到的主要的技术是java NIO、网络编程、反射和动态代理,如果对这几块不太熟悉的话,建议先找些资料看看相关的东西
(我这在main方法简单改动,new了个Configuration()对象,当参数传进来)
重点内容
内部类介绍
我们看到我们上面提到的两个内部类listener和responder都是在这里创建的,之后调用start方法启动服务。
同时启动了一些Reader线程,这些线程是用来从channel读取数据的。
跟踪Reader的run方法,我们看到最后将读取的信息封装成了一个Call对象put到callQueue中
最后调用了RpcInvoker的call方法最终通过反射来执行相应的方法
在这里是通过java的动态代理来获取代理。通过跟踪代码我们找到了这里
在sendRpcRequest方法里,可以看到使用了基于tcp的socket通讯,将数据发送了服务器端。
准备工作
Hadoop rpc实现流程
定义接口
实现接口
启动一个server
构建一个client的代理
执行相应的方法
Server底层实现
内部类介绍
Call
Connection
Handler
Listener
Reader
Responder
Server的启动
接收请求
Reader线程读取数据
Handler线程处理请求
客户端实现
获取代理
发送请求
总结
前言
因为hadoop底层各种通讯都用的是rpc,如client和namenode、client和datanode、namanode和datanode等。所以首先学习了一下hadoop rpc的内部实现,拜读了一下hadoop的源码准备工作
首先下载hadoop的最新稳定版源码(目前是2.7.3),编译hadoop源码,因为hadoop的底层序列号用的是google的 protobuf,所以需要把这些proto文件编译成java文件,方便debug调试。如果比较懒的话,其实用maven把相关jar和源码包下载下来也行。Hadoop的rpc并没有采用现成的rpc框架,如thrift等,而是采用jdk自带的库完全自己写了一套,更加轻量级,更加可控。
用到的主要的技术是java NIO、网络编程、反射和动态代理,如果对这几块不太熟悉的话,建议先找些资料看看相关的东西
Hadoop rpc实现流程
Hadoop rpc框架位于hadoop源码的hadoop-commn项目里,就像我们学习任何语言先学习hello world一样,我们先来一个最简单的程序,这个程序是从hadoop源码test目录里找到的,testRPC.java,我们运行其中的main方法。(我这在main方法简单改动,new了个Configuration()对象,当参数传进来)
定义接口
首先要定义一个接口协议,所有的接口都要继承VersionedProtocolpublic interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; String echo(String value) throws IOException; }
实现接口
要实现这个接口public static class TestImpl implements TestProtocol { @Override public long getProtocolVersion(String protocol, long clientVersion) { return TestProtocol.versionID; } @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int hashcode) { return new ProtocolSignature(TestProtocol.versionID, null); } @Override public String echo(String value) throws IOException { return value; } }
启动一个server
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) .build(); server.start();
构建一个client的代理
TestProtocol proxy = RPC.getProxy(TestProtocol.class,TestProtocol.versionID,addr, conf);
执行相应的方法。
String stringResult = proxy.echo("hello hadoop rpc"); System.out.println(stringResult);
重点内容
Server底层实现
内部类介绍
server类是org.apache.hadoop.ipc.Server,里面包含几个重要的内部类内部类介绍
Call
将一个rpc请求需要的东西封装到Call对象里private final int callId; // the client's call id 客户端id private final int retryCount; // the retry count of the call 重试次数 private final Writable rpcRequest; // Serialized Rpc request from client 序列号的请求 private final Connection connection; // connection to client private long timestamp; // time received when response is null // time served when response is not null private ByteBuffer rpcResponse; // the response for this call private final RPC.RpcKind rpcKind; private final byte[] clientId; private final Span traceSpan; // the tracing span on the server side
Connection。
客户端与服务器通信的一些信息在这个里面Handler
用于处理接受到rpc请求Listener
用于监听rpc请求。Reader
用于读取Listener接受到的请求Responder
用于将rpc请求返回客户端Server的启动
服务器的构造是通过静态方法RPC.Builder(conf).build()创建的,通过跟踪代码我们发现他最后调用了Server的构造方法protected Server(String bindAddress, int port,Class<? extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig) throws IOException { ………………………………….. this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),maxQueueSize, prefix, conf); ………………………………………… // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); ……………………………………………….. // Create the responder here responder = new Responder(); ……………………………………………. }
我们看到我们上面提到的两个内部类listener和responder都是在这里创建的,之后调用start方法启动服务。
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
接收请求
从Listener的构造方法中我们看到服务器监听了SelectionKey.OP_ACCEPT,他只是监听是否有请求过来,而不做处理,这样为了提高并发。同时启动了一些Reader线程,这些线程是用来从channel读取数据的。
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } //监听OP_ACCEPT事件 **acceptChannel.register(selector, SelectionKey.OP_ACCEPT);** this.setName("IPC Server listener on " + port); this.setDaemon(true); }
Reader线程读取数据
通过Listener的run方法我们看到如果一旦接受到请求,然后就让reader去处理Connection c = connectionManager.register(channel); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { IOUtils.cleanup(null, channel); } continue; } key.attach(c); // so closeCurrentConnection can get the object **reader.addConnection(c);**
跟踪Reader的run方法,我们看到最后将读取的信息封装成了一个Call对象put到callQueue中
Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceSpan); callQueue.put(call); // queue the call; maybe blocked here
Handler线程处理请求
final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); } if (!call.connection.channel.isOpen()) { LOG.info(Thread.currentThread().getName() + ": skipped " + call); continue; }
最后调用了RpcInvoker的call方法最终通过反射来执行相应的方法
Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); server.rpcDetailedMetrics.init(protocolImpl.protocolClass); Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters()); if (server.verbose) log("Return: "+value); return new ObjectWritable(method.getReturnType(), value);
客户端实现
获取代理
通过RPC的静态方法getProxy获取代理TestProtocol proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
在这里是通过java的动态代理来获取代理。通过跟踪代码我们找到了这里
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException { if (connectionRetryPolicy != null) { throw new UnsupportedOperationException( "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); } T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth)); return new ProtocolProxy<T>(protocol, proxy, true); }
发送请求。
在Invoker的构造方法里,我们看到在这里新建了一个org.apache.hadoop.ipc.Client对象,在invoke方法里调用了client里面的call方法,最终调用connection.sendRpcRequest(call); 来发送rpc请求final Call call = createCall(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); }
在sendRpcRequest方法里,可以看到使用了基于tcp的socket通讯,将数据发送了服务器端。
synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData(); int totalLength = d.getLength(); out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); }
总结
比较仓促,写的比较简陋,后续有时间还会继续跟进补充。相关文章推荐
- 源码级强力分析hadoop的RPC机制
- Hadoop RPC源码解析——Server类(一)
- hadoop源码解析之hdfs写数据全流程分析---datanode处理
- Hadoop 源码解析-rpc扩展
- Hadoop RPC源码分析
- hadoop的源码分析之RPC(Remote Procedure Call Protocol)
- Hadoop源码分析- RPC client端篇
- Hadoop源码分析之一(RPC机制之Server)
- Hadoop之RPC Server源码分析
- 源码级强力分析hadoop的RPC机制
- Hadoop RPC源码分析
- Hadoop RPC源码解析——RPC框架详解
- Hadoop RPC源码解析——Client类
- 源码级强力分析hadoop的RPC机制
- hadoop源码解析之hdfs写数据全流程分析---创建文件
- Hadoop RPC 源码分析- 相关类图
- Hadoop RPC的机制分析和源码解读
- 源码级强力分析hadoop的RPC机制
- Hadoop源码分析之一(RPC机制之Server)
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析