您的位置:首页 > 编程语言 > Java开发

自定义的RPC的Java实现

2011-11-04 18:53 281 查看


自定义的RPC的Java实现

博客分类:

Java

JavaSocketHadoopGooglethread

在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实现RPC的想法。

RPC的全名Remote Process Call,即远程过程调用。使用RPC,可以像使用本地的程序一样使用远程服务器上的程序。下面是一个简单的RPC 调用实例,从中可以看到RPC如何使用以及好处:

Java代码


public class MainClient {

public static void main(String[] args) {

Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);

System.out.println(echo.echo("hello,hello"));

}

}

Java代码


Java代码


public interface Echo {

public String echo(String string);

}

使用RPC.getProxy生成接口Echo的代理实现类。然后就可以像使用本地的程序一样来调用Echo中的echo方法。

使用RPC的好处是简化了远程服务访问。提高了开发效率。在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。

在看了RPCClient如何使用,我们再来定义一个RPC服务器的接口,看看服务器都提供什么操作:

Java代码


public interface Server {

public void stop();

public void start();

public void register(Class interfaceDefiner,Class impl);

public void call(Invocation invo);

public boolean isRunning();

public int getPort();

}

服务器提供了start和stop方法。使用register注册一个接口和对应的实现类。call方法用于执行Invocation指定的接口的方法名。isRunning返回了服务器的状态,getPort()则返回了服务器使用的端口。

来看看Invocation的定义:

Java代码


public class Invocation implements Serializable{

/**

*

*/

private static final long serialVersionUID = 1L;

private Class interfaces;

private Method method;

private Object[] params;

private Object result;

/**

* @return the result

*/

public Object getResult() {

return result;

}

/**

* @param result the result to set

*/

public void setResult(Object result) {

this.result = result;

}

/**

* @return the interfaces

*/

public Class getInterfaces() {

return interfaces;

}

/**

* @param interfaces the interfaces to set

*/

public void setInterfaces(Class interfaces) {

this.interfaces = interfaces;

}

/**

* @return the method

*/

public Method getMethod() {

return method;

}

/**

* @param method the method to set

*/

public void setMethod(Method method) {

this.method = method;

}

/**

* @return the params

*/

public Object[] getParams() {

return params;

}

/**

* @param params the params to set

*/

public void setParams(Object[] params) {

this.params = params;

}

@Override

public String toString() {

return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";

}

}

具体服务器实现类中的call方法是这样使用Invocation的:

Java代码


@Override

public void call(Invocation invo) {

Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类

if(obj!=null) {

try {

Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());

Object result = m.invoke(obj, invo.getParams());

invo.setResult(result);

} catch (Throwable th) {

th.printStackTrace();

}

} else {

throw new IllegalArgumentException("has no these class");

}

}

下面来看服务器接收连接并处理连接请求的核心代码:

Java代码


public class Listener extends Thread {

private ServerSocket socket;

private Server server;

public Listener(Server server) {

this.server = server;

}

@Override

public void run() {

System.out.println("启动服务器中,打开端口" + server.getPort());

try {

socket = new ServerSocket(server.getPort());

} catch (IOException e1) {

e1.printStackTrace();

return;

}

while (server.isRunning()) {

try {

Socket client = socket.accept();

ObjectInputStream ois = new ObjectInputStream(client.getInputStream());

Invocation invo = (Invocation) ois.readObject();

server.call(invo);

ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());

oos.writeObject(invo);

oos.flush();

oos.close();

ois.close();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

try {

if (socket != null && !socket.isClosed())

socket.close();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

RPC具体的Server类是这样来使用Listener的:

Java代码


public static class RPCServer implements Server{

private int port = 20382;

private Listener listener;

private boolean isRuning = true;

/**

* @param isRuning the isRuning to set

*/

public void setRuning(boolean isRuning) {

this.isRuning = isRuning;

}

/**

* @return the port

*/

public int getPort() {

return port;

}

/**

* @param port the port to set

*/

public void setPort(int port) {

this.port = port;

}

private Map<String ,Object> serviceEngine = new HashMap<String, Object>();

@Override

public void call(Invocation invo) {

System.out.println(invo.getClass().getName());

Object obj = serviceEngine.get(invo.getInterfaces().getName());

if(obj!=null) {

try {

Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());

Object result = m.invoke(obj, invo.getParams());

invo.setResult(result);

} catch (Throwable th) {

th.printStackTrace();

}

} else {

throw new IllegalArgumentException("has no these class");

}

}

@Override

public void register(Class interfaceDefiner, Class impl) {

try {

this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());

System.out.println(serviceEngine);

} catch (Throwable e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

@Override

public void start() {

System.out.println("启动服务器");

listener = new Listener(this);

this.isRuning = true;

listener.start();

}

@Override

public void stop() {

this.setRuning(false);

}

@Override

public boolean isRunning() {

return isRuning;

}

}

服务器端代码搞定后,来看看客户端的代码,先看看我们刚开始使用RPC.getProxy方法:

Java代码


public static <T> T getProxy(final Class<T> clazz,String host,int port) {

final Client client = new Client(host,port);

InvocationHandler handler = new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Invocation invo = new Invocation();

invo.setInterfaces(clazz);

invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));

invo.setParams(args);

client.invoke(invo);

return invo.getResult();

}

};

T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);

return t;

}

Client类的代码如下:

Java代码


public class Client {

private String host;

private int port;

private Socket socket;

private ObjectOutputStream oos;

private ObjectInputStream ois;

public String getHost() {

return host;

}

public void setHost(String host) {

this.host = host;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public Client(String host, int port) {

this.host = host;

this.port = port;

}

public void init() throws UnknownHostException, IOException {

socket = new Socket(host, port);

oos = new ObjectOutputStream(socket.getOutputStream());

}

public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {

init();

System.out.println("写入数据");

oos.writeObject(invo);

oos.flush();

ois = new ObjectInputStream(socket.getInputStream());

Invocation result = (Invocation) ois.readObject();

invo.setResult(result.getResult());

}

}

至此,RPC的客户端和服务器端代码完成,启动服务器的代码如下:

Java代码


public class Main {

public static void main(String[] args) {

Server server = new RPC.RPCServer();

server.register(Echo.class, RemoteEcho.class);

server.start();

}

}

现在先运行服务器端代码,再运行客户端代码,就可以成功运行。

详细的代码,参考附件的源代码。

在写这个RPC时,没有想太多。在数据串行化上,使用了java的标准io序列化机制,虽然不能跨平台,但是做DEMO还是不错的;另外在处理客户端请求上,使用了ServerSocket,而没有使用ServerSocketChannel这个java nio中的新特性;在动态生成接口的实现类上,使用了java.lang.reflet中的Proxy类。他可以动态创建接口的实现类。

Rpc.rar (17.1 KB)

下载次数: 24

转自:http://jbm3072.iteye.com/blog/1088102
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: