您的位置:首页 > 其它

RPC远程过程调用之我的理解(附带项目希望有人交流)

2017-12-08 16:51 671 查看
最近在学习开发过程中使用到了阿里开发的dubbo框架,将项目进行分布式。

最近的学习了解到了一些关于RPC的原理,心血来潮就试着实现了一下自己的RPC功能。

项目主要分为三个子项目

API 项目 定义了通信的数据模型和序列化反序列化所使用的工具以及项目测试使用的bean和接口

Server 项目作为RPC的过程服务提供者

Client 项目是服务的调用者

我这里使用的是MINA作为TCP服务器 利用jdk自带的方法进行序列化 在服务调用方做一个接口的代理让接口执行socket通信将调用服务的请求发送到Server端在Server端反射出接口的实现类执行方法再讲返回数据通过网络写回去

先给大家看一下我的项目结果截图



这里是本项目的调用过程



主要是利用了代理和反射 利用代理在客户端调用时产生一个socket连接访问服务端,在服务端根据数据反射出一个已经存在的接口实现类型来执行方法,再将执行结果返回给调用的客户端

接下来直接上代码

这里是我的API项目中的一些数据类型的定义

第一个是 请求发送时的调用描述

包括 调用的接口名称,调用方法名称,调用方法参数列表,调用方法参数类型列表

/**
* 用于描述本次调用的情况
* @author Ming
*
*/
public class NetModel implements Serializable{
//类名称
private String type;
//方法名称
private String method;
//参数
private Object[] args;
//参数的类型
private String[] types;

public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public String[] getTypes() {
return types;
}
public void setTypes(String[] types) {
this.types = types;
}
}


接下来就是序列化工具使用了jdk自带的序列化方式,

这里我还遇到了一些困难:单纯的从流中读取数组因为不知道数组的长度是多少所以在反序列化的过程中总是因为byte[]长度不准确不能正确的读取出对象,所以在序列化的数组前面多加了4位是一个int类型的数据表示的是这次序列化产生的byte[]长度这样就可以准确的反序列化出想要的对象

/**
* 网络通信序列化工具
* @author Ming
*
*/
public class SerializationUtil {
/**
* 对象序列化成数组
* @param object
* @return
*/
public static byte[] objectToBytes(Object object){
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOut;
try {
objectOut = new ObjectOutputStream(output);
objectOut.writeObject(object);
objectOut.close();
output.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return output.toByteArray();
}
/**
* 数组反序列化成对象
* @param bytes
* @return
*/
public static Object byetsToObject(byte[] bytes){
ByteArrayInputStream input = new ByteArrayInputStream(bytes);
ObjectInputStream objectIn;
Object object = null;
try {
objectIn = new ObjectInputStream(input);
object = objectIn.readObject();
objectIn.close();
input.close();

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return object;
}
/**
* 将int数值转换为占四个字节的byte数组,本方法适用于(低位在前,高位在后)的顺序。
* @param value
* @return
*/
public static byte[] intToBytes(int value)   {
byte[] byte_src = new byte[4];
byte_src[3] = (byte) ((value & 0xFF000000)>>24);
byte_src[2] = (byte) ((value & 0x00FF0000)>>16);
byte_src[1] = (byte) ((value & 0x0000FF00)>>8);
byte_src[0] = (byte) ((value & 0x000000FF));
return byte_src;
}

/**
* byte数组中取int数值,本方法适用于(低位在前,高位在后)的顺序。
*
* @param ary
*            byte数组
* @param offset
*            从数组的第offset位开始
* @return int数值
*/
public static int bytesToInt(byte[] ary, int offset) {
int value;
value = (int) ((ary[offset]&0xFF)
| ((ary[offset+1]<<8) & 0xFF00)
| ((ary[offset+2]<<16)& 0xFF0000)
| ((ary[offset+3]<<24) & 0xFF000000));
return value;
}

/**
* 将对象直接序列化成为RPC网络通信中使用的二进制数组
* @param obj
* @return
*/
public static byte[] objToNetBytes(Object obj) {
byte[] objBytes = objectToBytes(obj);
int length = objBytes.length;
byte[] lengthBytes = intToBytes(length);
byte[] RPCBytes = new byte[length+4];
System.arraycopy(lengthBytes, 0, RPCBytes, 0, 4);
System.arraycopy(objBytes, 0, RPCBytes, 4, length);
return RPCBytes;
}

/**
* 将RPC网络通信中的二进制数组序列化成为使用对象
* @param bytes
* @return
*/
public static Object netBytesToObj(byte [] RPCbytes) {
int length = bytesToInt(RPCbytes, 0);
byte [] objBytes = new byte[length];
System.arraycopy(RPCbytes, 4, objBytes, 0, length);
Object object = byetsToObject(objBytes);
return object;
}
}


API项目基本介绍完成了剩下的就是接口和JavaBean就不再多做说明了,在文章最后我会带上我的项目。直接上代码了

public class Player implements Serializable{
@Override
public String toString() {
return "Player [year=" + year + ", name=" + name + ", date=" + date + "]";
}

private int year;

private String name;

private Date date;

public Player(int year, String name, Date date) {
super();
this.year = year;
this.name = name;
this.date = date;
}

public Player() {

}

public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Date getDate() {
return date;
}

public void setDate(Date date) {
this.date = date;
}
}


public interface PlayerService {
Player creat();

List<Player> carList(Integer x);

List<Player> list(Integer count,String name);
}


下一步介绍的是Server端的项目

先上一个项目结构图



className.properties是一个接口实现类的配置文件,key是接口名称,value是接口的实现,我这里就只有一个测试的接口和接口实现类

所以就只有一行。

com.learn.api.service.PlayerService=com.learn.server.serviceImpl.PlayerServiceImpl


conf.properties文件是给MINA使用的用来说明监听的地址和端口

ip=127.0.0.1

port=10800


MinaStart 是一个MINA框架的入口根据conf.properties开启一个端口监听

/**
* mina程序入口
* @author Ming
*
*/
public class MinaStart {
private static IoAcceptor accept = new NioSocketAcceptor();
public static Properties className = new Properties();

public static void start(int port,String ip,IoHandlerAdapter adapter ) throws Exception{
accept.getSessionConfig().setReadBufferSize(2048 * 10);
accept.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,60*5);

LoggingFilter lf = new LoggingFilter();
lf.setSessionCreatedLogLevel(LogLevel.INFO);
lf.setSessionOpenedLogLevel(LogLevel.INFO);
lf.setMessageReceivedLogLevel(LogLevel.INFO);
lf.setMessageSentLogLevel(LogLevel.INFO);

accept.getFilterChain().addLast("logger", lf);
accept.getFilterChain().addLast("exceutor",
new ExecutorFilter(Executors.newCachedThreadPool()));
accept.setHandler(adapter);
accept.bind(new InetSocketAddress(ip, port));
System.out.println("启动MINA服务 监听端口:"+port);
}

public static void main(String[] args) throws Exception{
Properties conf = new Properties();
conf.load(MinaStart.class.getResourceAsStream("/conf.properties"));
String ip = conf.getProperty("ip");
int port = Integer.parseInt(conf.getProperty("port"));
start(port, ip, new RpcInvokeHandler());
}
}


ClassNameManerge 是一个接口实现类的管理类,顺便就用了一个不怎么严谨的单例模式…根据className.properties 反射获取实例

public class ClassNameManerge {
private static Map<String, Object> instances = new HashMap<String, Object>();

private static Properties classNames = new Properties();

static {
try {
classNames.load(ClassNameManerge.class.getResourceAsStream("/className.properties"));
} catch (Exception e) {
System.out.println("配置文件读取异常");
e.printStackTrace();
}
}

/**
* 根据配置文件中的配置接口实现类获取对应的接口实习 (使用了一个不怎么严谨的单例模式)
* @param className 接口的类名称
* @return 接口的一个实现
*/
public static Object getInstance(String className) {
String type = classNames.getProperty(className);
Object instance = instances.get(type);
if(instance!=null) {
return instance;
}
try {
Class clazz = Class.forName(type);
instance = clazz.newInstance();
instances.put(className, instance);
} catch (Exception e) {
e.printStackTrace();
}
return instance;
}
}


接下来就是server端主要处理的业务了是一个MINA框架的执行逻辑处理的类型,其中messageReceived方法指的是当消息接收到时所执行的代码

public class RpcInvokeHandler extends IoHandlerAdapter{
/**
* 提供了一个在Server端执行方法的过程
* 这里偷懒使用的是MINA框架
*/
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
try {
/**
* 从数据流中读取出描述调用的对象
*/
IoBuffer bf = (IoBuffer) message;
byte[] netBytes = new byte[bf.limit()];
bf.get(netBytes);
Object obj = SerializationUtil.netBytesToObj(netBytes);
NetModel model = (NetModel) obj;

Object[] args = model.getArgs();
String methodName = model.getMethod();
String type = model.getType();
String[] types = model.getTypes();

Class [] classes = null;
if(types!=null) {
classes = new Class[types.length];
for (int i = 0; i < classes.length; i++) {
classes[i] = Class.forName(types[i]);
}
}

Object service = ClassNameManerge.getInstance(type);
Class<? extends Object> clazz = service.getClass();
Method method = clazz.getMethod(methodName, classes);
Object invoke = method.invoke(service, args);
byte[] bytes = SerializationUtil.objToNetBytes(invoke);

/**
* 这里一段可以注释掉只是用来展示说明调用方法说明的
*/
System.out.println("RPC >>>");
System.out.println("\t inerface:"+type);
System.out.println("\t method:"+methodName);
String ss ="";
for (String b : types) {
ss += (b+"--");
}
System.out.println("\t argsType:"+ss);
String tt ="";
for (Object b : args) {
tt += (b+"--");
}
System.out.println("\t args:"+tt);

/**
* 将调用结果写回调用客户端
*/
IoBuffer buffer = IoBuffer.allocate(bytes.length);
buffer.put(bytes, 0, bytes.length);
buffer.flip();
session.write(buffer);

} catch (Exception e) {
e.printStackTrace();
}
}
}


最后还有一个接口的实现类

public class PlayerServiceImpl implements PlayerService{

public Player creat() {
return new Player(22,"cmm",new Date());
}

public List<Player> carList(Integer x) {
// TODO Auto-generated method stub
List<Player> list = new ArrayList<Player>();
for (int i = 0; i < x; i++) {
list.add(new Player(22*i,"cmm"+i,new Date()));
}
return list;
}

public List<Player> list(Integer count, String name) {
List<Player> list = new ArrayList<Player>();
for (int i = 0; i < count; i++) {
list.add(new Player(22*i,name+i,new Date()));
}
return list;
}

}


到这里server端的处理代码也介绍完了

下一步就是client端的代码介绍了

也上一个结构图吧



config.properties文件配置的是server端的ip以及端口

ip=127.0.0.1
port=10800


RpcClient是一个socket调用客户端里面其实就只有一个方法就是把调用说明给server 返回server的处理结果

/**
* scoket 通信客户端
* @author Ming
*
*/
public class RpcClient {
private static Properties conf = new Properties();

private static int port = 0;

private static String ip = null;

static {
try {
conf.load(RpcClient.class.getResourceAsStream("/config.properties"));
ip = conf.getProperty("ip");
port = Integer.parseInt(conf.getProperty("port"));
} catch (Exception e) {
System.out.println("配置文件读取失败");
e.printStackTrace();
}
}

/**
* 调用server端的socket访问
* @param model 本次调用的描述
* @return 调用结果
*/
public Object invoke (NetModel model) {
Object value = null;
try {
Socket socket = new Socket(ip, port);
OutputStream out = socket.getOutputStream();
byte[] bytes = SerializationUtil.objToNetBytes(model);
out.write(bytes);
InputStream in = socket.getInputStream();
byte [] legnthBytes = new byte[4];
in.read(legnthBytes);
int legnth = SerializationUtil.bytesToInt(legnthBytes, 0);
byte [] objBytes =  new byte [legnth];
in.read(objBytes);
value = SerializationUtil.byetsToObject(objBytes);
} catch (Exception e) {
e.printStackTrace();
}
return value;

}

}


ProxyFactory看名字就应该明白是一个代理对象的工厂,用于生成客户端的接口代理对象

public class ProxyFactory {

private static InvocationHandler handler = new InvocationHandler() {

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcClient client = new RpcClient();
NetModel model = new NetModel();

Class<?>[] classes = proxy.getClass().getInterfaces();
String className = classes[0].getName();

String [] types = null;
if(args!=null) {
types = new String [args.length];
for (int i = 0; i < types.length; i++) {
types[i] = args[i].getClass().getName();
}
}

model.setArgs(args);
model.setTypes(types);
model.setType(className);
model.setMethod(method.getName());

Object invoke = client.invoke(model);
return invoke;
}
};

public static <T> T getInstance(Class<T> clazz) {
ClassLoader classLoader = clazz.getClassLoader();
Class<?>[] interfaces = new Class[] {clazz};
return (T) Proxy.newProxyInstance(classLoader, interfaces, handler);
}
}


最后就是test测试类了

/**
* 测试类
* @author Ming
*
*/
public class Test {
public static void main(String[] args) {
name();
}

public static void name() {
PlayerService service = ProxyFactory.getInstance(PlayerService.class);
List<Player> carList = service.carList(6);
for (Player player : carList) {
System.out.println(player);
}
System.out.println("-----------------------");
List<Player> list = service.list(5,"James");
for (Player player : list) {
System.out.println(player);
}
}
}


先执行server 端 MinaStart的main方法 再执行client端Test的main方法

然后当然就是执行结果的截图了





最后是我的代码了,在csdn的文件里面我加了那张过程的图片在博客中的展现效果不是很理想

csdn:下载http://download.csdn.net/download/cmmchenmm/10151388

或者是githulb上也有一份

https://github.com/newShiJ/RPC

对了如果使用git下载的同学要说声对不起,因为我是先上传的git再写的本篇博客,可能有一些代码块的注释没有那么完整,不过基本的注释还是有的。

最后希望读过我这篇博客的人,能够给我一些意见一起交流学习谢谢。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: