您的位置:首页 > 运维架构

hadoop RPC过程1

2015-12-22 22:47 267 查看
使用Hadoop源代码启动了一个RPCServer端和一个RPC的client端,模拟了hadoop的远程调度的过程。

1.RPCServer

1.RpcEngine

Server server = new RPC.Builder(conf)   //.setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setProtocol(IPCQueryStatus.class)
.setInstance(jProtocol)
.setBindAddress("127.0.0.1")
//set the bindHost as default 0.0.0.0
.setPort(63333)
.setNumHandlers(1)
.setVerbose(false)
//                    .setSecretManager(namesystem.getDelegationTokenSecretManager())
.setSecretManager(null)
.build();
server.start();


Builder模式的入口在RPC.class中,这是RPC调用的核心类。

return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance,
this.bindAddress, this.port,
this.numHandlers, this.numReaders,this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager,this.portRangeConfig);


getProtoclEngine方法返回一个RpcEngine,RpcEngine是什么类?

RpcEngine是一个RPC的实际应用类。

RpcEngine是一个接口,实现这个接口有两个实际类ProtobufRpcEngine.class和WritableRpcEngine.class

在getProtoclEngine方法中默认使用WritableRpcEngine,这里的策略模式是为了在方法运行时,可以根据配置动态选择实际类。

PROTOCOL_ENGINES是一个protocol和engine之间的Map表,以后对于相同的protocol,可以直接获取engine,而不用每次都读取配置。

static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}


ProtobufRpcEngine.class和WritableRpcEngine.class 两个类中getServer方法都是new一个继承于RPC.Server的内部静态类。在这个静态内的构造器中做了两件事 :1构建一个RPC.Server 2执行RPC.Server中的registerProtocolAndImpl方法将interfacce注册到RPC call中,用来提供client端远程调用。

//WritableRpcEngine
// register protocol class and its super interfaces
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}


//ProtobufRpcEngine
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);


可以看出在WritableRpcEngine中不仅注册了创建server时设定的interface,而且还注册了创建server时设定的class实例的所有interface.

2.RPC.Server

接下来讨论RPC.Server类和registerProtocolAndImpl方法

1.registerProtocolAndImpl方法

// Register  protocol and its impl for rpc calls
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl) {
//step1 getProtocolName and getProtocolVersion
String protocolName = RPC.getProtocolName(protocolClass);
long version;
try {
version = RPC.getProtocolVersion(protocolClass);
} catch (Exception ex) {
LOG.warn("Protocol "  + protocolClass +
" NOT registered as cannot get protocol version ");
return;
}
//step2
getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
new ProtoClassProtoImpl(protocolClass, protocolImpl));
LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
" ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName());
}


step1.先取protocol的名字,也就是接口名。使用的方法是Class.getName()。然后取protocol的Version,使用的方法是

Field versionField = protocol.getField("versionID");
versionField.setAccessible(true);
return versionField.getLong(protocol);


所以在需要RPC call的接口需要有一个versionID,像这样

public interface IPCQueryStatus extends VersionedProtocol {
public static final long versionID = 37L;
}


仔细看了源代码之后,源代码中会先读取一个注解类ProtoInfo。如果在注解类中写了protocolName和protocolVersion的话,会直接使用注入的值。比如像这样

public interface IPCQueryStatus extends VersionedProtocol {
public static final long versionID = 37L;
@ProtocolInfo(protocolName = "IPCPueryStatus",protocolVersion=34L)
}


那么这个类的ProtocolName就是IPCPueryStatus而不是IPCQueryStatus。这个类的protocolVersion就是34L而不是37L。(查了源代码后,并没有看见这个ProtoInfo注解类的使用==!)

step2.根据每一个RpcKind创建一个HashMap。ProtoNameVer确定了一个ProtoClassProtoImpl.

ProtoNameVer类包括String类名和long版本号

ProtoClassProtoImpl类包括Class接口和实际Object

/**
* Store a map of protocol and version to its implementation
*/
/**
*  The key in Map
*/
static class ProtoNameVer {
final String protocol;
final long   version;
ProtoNameVer(String protocol, long ver) {
this.protocol = protocol;
this.version = ver;
}
@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (this == o)
return true;
if (! (o instanceof ProtoNameVer))
return false;
ProtoNameVer pv = (ProtoNameVer) o;
return ((pv.protocol.equals(this.protocol)) &&
(pv.version == this.version));
}
@Override
public int hashCode() {
return protocol.hashCode() * 37 + (int) version;
}
}


effectiveJava中提到过,尽量不要使用String来表示一个对象,ProtoNameVer类的实现有它的价值。

/**
* The value in map
*/
static class ProtoClassProtoImpl {
final Class<?> protocolClass;
final Object protocolImpl;
ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
this.protocolClass = protocolClass;
this.protocolImpl = protocolImpl;
}
}


2.RPC.Server

RPC.Server继承于Server类,在构造器中:1.构造了Server类,2.initProtocolMetaInfo(conf)方法

RPC.Server继承于Server类,只重写了一个方法

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}


initProtocolMetaInfo(conf)方法其实是调用前面介绍的registerProtocolAndImpl方法,注册了一个RpcKind.RPC_PROTOCOL_BUFFER

registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,protocolInfoBlockingService);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop 源代码