基于Netty的高性能JAVA的RPC框架
前言
今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。
这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。
RPC题目如下
一个简单的RPC框架
RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。
题目要求
1.要成为框架:对于框架的使用者,隐藏RPC实现。
2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。
3.支持异步调用,提供future、callback的能力。
4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。
5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。
6.提供RPC上下文,客户端可以透传数据给服务端。
7.提供Hook,让开发人员进行RPC层面的AOP。
注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。
衡量标准
满足所有要求。 性能测试。
测试时会运行rpc-use-demo中的测试用例,测试的demo包由测试工具做好。
参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcConsumer,并覆写所有的public方法。
参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcProvider,并覆写所有的public方法。
参赛者依赖公共maven中心库上的三方包,即可看到一个示例的demo,按照对应的包名,在自己的工程中建立对应的类(包名、类名一致)。
三方库里的代码起到提示的作用,可以作为参考,不要在最终的pom中依赖。
所以最终参赛者需要打出一个rpc-api的jar包,供测试工程调用。 (注意,参考完rpc-api的示例后,请从pom依赖中将其删除,避免依赖冲突)
测试Demo工程请参考Taocode SVN上的代码。
RPC的实现
题目中推荐的网络框架使用Netty4来实现,这个RPC框架中需要实现的有
- RPC客户端
- RPC服务端
RPC客户端的实现
RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数
RpcConsumerImpl的实现
......
package com.alibaba.middleware.race.rpc.api.impl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
import com.alibaba.middleware.race.rpc.api.RpcConsumer;
import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.netty.RpcConnection;
import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;
import com.alibaba.middleware.race.rpc.tool.Tool;
public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {
private static AtomicLong callTimes = new AtomicLong(0L); privat 71f0 e RpcConnection connection; private List<RpcConnection> connection_list; private Map<String,ResponseCallbackListener> asyncMethods; private Class<?> interfaceClass; private String version; private int timeout; private ConsumerHook hook; public Class<?> getInterfaceClass() { return interfaceClass; } public String getVersion() { return version; } public int getTimeout() { this.connection.setTimeOut(timeout); return timeout; } public ConsumerHook getHook() { return hook; } RpcConnection select() { //Random rd=new Random(System.currentTimeMillis()); int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1)); if(d==0) return connection; else { return connection_list.get(d-1); } } public RpcConsumerImpl() { //String ip=System.getProperty("SIP"); String ip="127.0.0.1"; this.asyncMethods=new HashMap<String,ResponseCallbackListener>(); this.connection=new RpcNettyConnection(ip,8888); this.connection.connect(); connection_list=new ArrayList<RpcConnection>(); int num=Runtime.getRuntime().availableProcessors()/3 -2; for (int i = 0; i < num; i++) { connection_list.add(new RpcNettyConnection(ip, 8888)); } for (RpcConnection conn:connection_list) { conn.connect(); } } public void destroy() throws Throwable { if (null != connection) { connection.close(); } } @SuppressWarnings("unchecked") public <T> T proxy(Class<T> interfaceClass) throws Throwable { if (!interfaceClass.isInterface()) { throw new IllegalArgumentException(interfaceClass.getName() + " is not an interface"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, this); } @Override public RpcConsumer interfaceClass(Class<?> interfaceClass) { // TODO Auto-generated method stub this.interfaceClass=interfaceClass; return this; } @Override public RpcConsumer version(String version) { // TODO Auto-generated method stub this.version=version; return this; } @Override public RpcConsumer clientTimeout(int clientTimeout) { // TODO Auto-generated method stub this.timeout=clientTimeout; return this; } @Override public RpcConsumer hook(ConsumerHook hook) { // TODO Auto-generated method stub this.hook=hook; return this; } @Override public Object instance() { // TODO Auto-generated method stub try { return proxy(this.interfaceClass); } catch (Throwable e) { e.printStackTrace(); } return null; } @Override public void asynCall(String methodName) { // TODO Auto-generated method stub asynCall(methodName, null); } @Override public <T extends ResponseCallbackListener> void asynCall( String methodName, T callbackListener) { this.asyncMethods.put(methodName, callbackListener); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public void cancelAsyn(String methodName) { // TODO Auto-generated method stub this.asyncMethods.remove(methodName); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub List<String> parameterTypes = new LinkedList<String>(); for (Class<?> parameterType : method.getParameterTypes()) { parameterTypes.add(parameterType.getName()); } RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); if(hook!=null) hook.before(request); RpcResponse response = null; try { request.setContext(RpcContext.props); response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName())); if(hook!=null) hook.after(request); if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null) { Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz()); throw e.getCause(); } } catch (Throwable t) { //t.printStackTrace(); //throw new RuntimeException(t); throw t; } finally {
// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)
// {
// cancelAsyn(request.getMethodName());
// }
}
if(response==null)
{
return null;
}
else if (response.getErrorMsg() != null)
{
throw response.getErrorMsg();
}
else
{
return response.getAppResponse();
}
}
}
RpcConsumer consumer;
consumer = (RpcConsumer) getConsumerImplClass().newInstance();
consumer.someMethod();123
因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。
服务器实现
RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。
package com.alibaba.middleware.race.rpc.api.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;
import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;
import com.alibaba.middleware.race.rpc.tool.ReflectionCache;
import com.alibaba.middleware.race.rpc.tool.Tool;
/**
[ul]*/
public class RpcRequestHandler extends ChannelInboundHandlerAdapter {
private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>();
//服务端接口-实现类的映射表
private final Map<String, Object> handlerMap;
KryoSerialization kryo=new KryoSerialization();
public RpcRequestHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;[url=http://blog.51cto.com/13952955/mailto:br/>}
@Override}
@OverrideSystem.out.println(]}
@Override
- 基于Netty的高性能JAVA的RPC框架
- 基于Java Netty框架构建高性能的部标808协议的GPS服务器
- 基于netty轻量的高性能分布式RPC服务框架forest<下篇>
- 基于Netty构建高性能RPC通信框架
- DUBBO 一个高性能,基于Java的开源RPC框架
- Java编写基于netty的RPC框架
- Java实现一个简单的RPC框架(五) 基于Socket的传输层实现
- 基于Netty的RPC简单框架实现(二):RPC服务端
- 基于Netty的RPC简单框架实现(三):Kryo实现序列化
- Netty Java NIO 高性能Socket 开发框架
- 基于hessian和netty的RPC框架设计和实现
- 基于Netty的RPC框架
- 基于Netty的RPC简单框架实现(一):RPC客户端
- 基于Netty的分布式 RPC 框架
- 自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现
- 基于hessian和netty的RPC框架设计和实现
- 自定义基于netty的rpc框架(2)---服务端的实现
- GRPC 1.3.4 发布,Google 高性能 RPC 框架(Java C++ Go)
- summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)
- 基于netty,hessian的RPC框架