您的位置:首页 > 运维架构

Hadoop源码之RPC机制

2012-12-11 14:56 429 查看
研读Hadoop源码,不得不说其中的RPC机制。

实现RPC的几个基本步骤:

1)客户端需要有一个负责与远程服务端对象通信的对象,称为A;

2)服务端需要有一个负责与远程客户端对象通信的对象,称为B;

3)A负责将客户端请求的Java类型方法、参数,序列化成字节流,通过网络传递给B;

4)B负责将通过网络收到的请求字节流,反序列化成Java类型方法、参数,传递给真正的服务端对象,并调用方法;

5)B收到服务端对象返回的结果,再序列化传递给A;

6)A反序列化后,将结果返回给客户端调用者。

如果采用RMI实现上述RPC过程的话,A即为存根对象,B即为骨架对象;实际上CORBA架构也离不开上述的过程。

下面从RPC调用过程的先后出场顺序,阐述hadoop的RPC方案:

1、客户端使用Java 反射,获取远程服务端对象的代理对象,可以视为A,同时解决了类型安全的问题;

public static Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
return Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(addr, conf));
}

通过new 一个Invoker,来实现invocationHandler,实现调用过程,具体调用又是通过New一个Client对象的call方法实现;

public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
ObjectWritable value = (ObjectWritable)
CLIENT.call(new Invocation(method, args), address);
return value.get();
}


2、上述的call调用与后续的调用,都是通过实现Writable接口的对象与ObjectWritable对象,来实现序列化与反序列化的,其中还包括所有Java基本类型在内都实现了Writable接口;

public interface Writable {
void write(DataOutput out) throws IOException;      //序列化接口

void readFields(DataInput in) throws IOException;   //反序列化接口
}


3、CLIENT在向服务端发送请求时,进行了同步,解决了线程安全的问题,并通过新建一个connection线程将调用序列化输出;

public Writable call(Writable param, InetSocketAddress address)
throws IOException {
Connection connection = getConnection(address); //新启一个connection线程
Call call = new Call(param);      //封装成call
synchronized (call) {
connection.sendParam(call);                 // connection线程发送请求
//以下省略
}

connection线程中,对call对象进行序列化输出;

public void sendParam(Call call) throws IOException {
calls.put(new Integer(call.id), call);
synchronized (out) {
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " sending #" + call.id);
try {
writingCall = call;
out.writeInt(call.id);
call.param.write(out);  //参数序列化,并写出
out.flush();
} finally {
writingCall = null;
}
}


4、服务端socket监听线程监听到请求时,会开启一个connection线程,构造参数对象,进行反序列化,构造call对象,唤醒call队列;

while (running) {
int id;
try {
id = in.readInt();                    // try to read an id
} catch (SocketTimeoutException e) {
continue;
}

if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " got #" + id);

Writable param = makeParam();           // 构造参数对象
param.readFields(in);                   // 反序列化

Call call = new Call(id, param, this);  // 构造call对象

synchronized (callQueue) {
callQueue.addLast(call);              // queue the call
callQueue.notify();                   // 唤醒call队列
}


5、服务端Handler线程被唤醒后,调用服务端对象,获取结果后,又序列化输出;

Writable value = null;
try {
value = call(call.param);             // 调用
} catch (IOException e) {
LOG.log(Level.INFO, getName() + " call error: " + e, e);
error = getStackTrace(e);
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " call error: " + e, e);
error = getStackTrace(e);
}

DataOutputStream out = call.connection.out;
synchronized (out) {
out.writeInt(call.id);                // write call id
out.writeBoolean(error!=null);        // write error flag
if (error != null)
value = new UTF8(error);
value.write(out);                     // write value
out.flush();
}

调用过程如下:

public Writable call(Writable param) throws IOException {

Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);

Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
Object value = method.invoke(instance, call.getParameters()); //真正调用服务端对象
if (verbose) log("Return: "+value);

return new ObjectWritable(method.getReturnType(), value);
}


6、以DataNode客户端为例,DataXceiveServer线程监听到服务端返回的数据后,就新开启一个DataXceiver线程,DataXceiver将返回的数据进行反序列化后再进行相应处理;

if (op == OP_WRITE_BLOCK) {
//
// Read in the header
//
DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
try {
boolean shouldReportBlock = in.readBoolean();
Block b = new Block();
b.readFields(in);                 //反序列化


这样就完成了一个完整的RPC调用过程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: