您的位置:首页 > 其它

反射与动态代理的应用(一):在RPC中的使用

2017-10-26 22:50 483 查看


在上一篇文章中,讲解了反射 与动态代理基本概念,没有看过点击此传送门

接下来看看,动态代理在RPC中是如何使用的。在讲解过程中会去除掉一些无关的代码,如果读者相应看全部的代码,可以去开源项目的仓库中下载相关源码,进一步的专研,以下也会给出链接。

实例

1.第一个实例取自黄勇的轻量级分布式 RPC 框架 ,由于实现中通信框架使用了Netty,所以在分析中会有部分Netty代码的信息,不过不用担心,即使不懂Netty,讲解的过程中会尽量避免,并会突出反射与动态代理在其中的作用。

在rpc-simple-client中HelloClient.Class有如下代码

HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);


这个代码做的是什么事呢?通过一个代理生成helloService对象,执行hello方法。

在我们印象中执行方法,最终都会执行的是接口中实现的方法。那事实是这样吗?看下面的分析。

在rpcProxy代码如下:

public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
// 创建动态代理对象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 创建 RPC 请求对象并设置请求属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(method.getDeclaringClass().getName());
request.setServiceVersion(serviceVersion);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// 获取 RPC 服务地址
if (serviceDiscovery != null) {
String serviceName = interfaceClass.getName();
if (StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
serviceAddress = serviceDiscovery.discover(serviceName);
LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
}
if (StringUtil.isEmpty(serviceAddress)) {
throw new RuntimeException("server address is empty");
}
// 从 RPC 服务地址中解析主机名与端口号
String[] array = StringUtil.split(serviceAddress, ":");
String host = array[0];
int port = Integer.parseInt(array[1]);
// 创建 RPC 客户端对象并发送 RPC 请求
RpcClient client = new RpcClient(host, port);
long time = System.currentTimeMillis();
RpcResponse response = client.send(request);
LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
if (response == null) {
throw new RuntimeException("response is null");
}
// 返回 RPC 响应结果
if (response.hasException()) {
throw response.getException();
} else {
return response.getResult();
}
}
}
);
}


从上面的代码可以看出经过了代理,执行hello方法,其实是发起一个请求。既然是一个请求,就是要涉及Client端与Server端,上面其实是一个Clent端代码。

那我们看看Server做了什么,去掉一个和本文所介绍不相关的代码,在RpcServerHandler中可以看核心代码如下

public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
Object result = handle(request);
response.setResult(result);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  //返回,异步关闭连接
}
其中hanlde中重要实现如下
// 获取反射调用所需的参数,这些都是Client端传输给我们的。
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 使用 CGLib 执行反射调用
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);


2.第二个实例取自xxl-job分布式任务调度平台

说明:此开源项目的,RPC通信是用Jetty来实现的。

在xxl-job-admin中XxlJobTrigger.Class的runExecutor有如下

ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);  //根据地址拿到执行器
runResult = executorBiz.run(triggerParam);


做了很简单的是取出执行器,触发执行。但是进入getExecutorBiz方法你会发现如下

executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address,
accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;


是不是很熟悉,没错,动态代理,看是NetComClientProxy的实现:

在结构上是不是和第一个实例中的rpcProxy代码,很相似呢。

new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();做了什么呢

public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// request封装
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);

// send发送
RpcResponse response = client.send(request);

// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}

}
});
}


依旧是封装了一个RpcRequest ,发送请求。所以在 runResult = executorBiz.run(triggerParam)

其实是在发送一个请求。上面是Client端代码,照旧,接着看Server代码,你会发现还是似成相识。去掉与本文无关的代码,得到如下:

在xxl-job-core中JettyServerHandler.Class

RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
点击进入:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class<?> serviceClass = serviceBean.getClass();  //类名
String methodName = request.getMethodName();    //方法名run
Class<?>[] parameterTypes = request.getParameterTypes();  //参数类型
Object[] parameters = request.getParameters();   //具体参数

FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 使用 CGLib 执行反射调用
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}


根据反射生成具体的类,来执行相关的方法,达到想要的目的。

上面两个实例的过程可以用下图概括:



由于本人水平有限,有什么问题可以评论,喜欢的可以关注。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: