您的位置:首页 > 编程语言 > Java开发

akka-rpc(基于akka的rpc实现)

2014-05-23 17:40 465 查看

akka-rpc(基于akka的rpc的实现)

代码:http://git.oschina.net/for-1988/Simples


目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。

RPC

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI

实现原理

整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Class<T> clz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。

Server端核心代码

public class RpcServer extends UntypedActor {
private Map<String, Object> proxyBeans;

public RpcServer(Map<Class<?>, Object> beans) {
proxyBeans = new HashMap<String, Object>();
for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
.hasNext();) {
Class<?> inface = iterator.next();
proxyBeans.put(inface.getName(), beans.get(inface));
}
}

@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口实现类的实例
CallBean event = (CallBean) message;
ReturnBean bean = new ReturnBean(
proxyBeans.get(event.getBeanName()), getSelf());
getSender().tell(bean, getSelf());
} else if (message instanceof RpcEvent.CallMethod) {
CallMethod event = (CallMethod) message;
Object bean = proxyBeans.get(event.getBeanName());
Object[] params = event.getParams();
List<Class<?>> paraTypes = new ArrayList<Class<?>>();
Class<?>[] paramerTypes = new Class<?>[] {};
if (params != null) {
for (Object param : params) {
paraTypes.add(param.getClass());
}
}
Method method = bean.getClass().getMethod(event.getMethodName(),
paraTypes.toArray(paramerTypes));
Object o = method.invoke(bean, params);
getSender().tell(o, getSelf());
}
}

}


启动Server

public static void main(String[] args) {
final Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2551)
.withFallback(
ConfigFactory
.parseString("akka.cluster.roles = [RpcServer]"))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("EsbSystem", config);

// Server 加入发布的服务
Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
}


Client端核心代码

RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。

public class RpcClient extends Thread {

private ActorSystem system;

private ActorRef rpc;

private ActorRef clientServer;

private static RpcClient instance = null;

public RpcClient() {
this.start();
final Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2552)
.withFallback(
ConfigFactory
.parseString("akka.cluster.roles = [RpcClient]"))
.withFallback(ConfigFactory.load());
system = ActorSystem.create("EsbSystem", config);

int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
boolean allowLocalRoutees = false;
ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
new AdaptiveLoadBalancingGroup(
HeapMetricsSelector.getInstance(),
Collections.<String> emptyList()),
new ClusterRouterGroupSettings(totalInstances, routeesPaths,
allowLocalRoutees, "RpcServer"));
rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
"client");
Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回调事件,恢复当前线程的中断
@Override
public void run() {
synchronized (instance) {
System.out.println("notify");
instance.notify();
}
}
});

}

public static RpcClient getInstance() {
if (instance == null) {
instance = new RpcClient();
synchronized (instance) {
try {   //中断当前线程,等待加入集群成功后,恢复
System.out.println("wait");
instance.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return instance;
}

public <T> T getBean(Class<T> clz) {
Future<Object> future = Patterns.ask(clientServer,
new RpcEvent.CallBean(clz.getName(), clientServer),
new Timeout(Duration.create(5, TimeUnit.SECONDS)));
try {
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
if (o != null) {
ReturnBean returnBean = (ReturnBean) o;
return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
clientServer, clz);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}


RpcClientServer

public class RpcClientServer extends UntypedActor {

private ActorRef rpc;

public RpcClientServer(ActorRef rpc) {
this.rpc = rpc;
}

@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RpcEvent.CallBean) {  //向Server发送CallBean请求
CallBean event = (CallBean) message;
Future<Object> future = Patterns.ask(rpc, event, new Timeout(
Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
getSender().tell(o, getSelf());
} else if (message instanceof RpcEvent.CallMethod) {  //向Server发送方法调用请求
Future<Object> future = Patterns.ask(rpc, message, new Timeout(
Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
getSender().tell(o, getSelf());
}
}
}

RpcBeanProxy,客户端的动态代理类

public class RpcBeanProxy implements InvocationHandler {

private ActorRef rpcClientServer;

private Class<?> clz;

public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
this.rpcClientServer = rpcClientServer;
this.clz = clz;
return Proxy.newProxyInstance(target.getClass().getClassLoader(),
target.getClass().getInterfaces(), this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Object result = null;
RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
method.getName(), args, clz.getName());
Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
new Timeout(Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
result = o;
return result;
}

}


Demo

Interface,Client和Server都需要这个类,必须实现序列化

public interface ExampleInterface extends Serializable{
public String sayHello(String name);
}

实现类,只需要Server端存在这个类。

public class ExampleInterfaceImpl implements ExampleInterface {
@Override
public String sayHello(String name) {
System.out.println("Be Called !");
return "Hello " + name;
}
}

Client调用

public static void main(String[] args) {
RpcClient client = RpcClient.getInstance();
long start = System.currentTimeMillis();

ExampleInterface example = client.getBean(ExampleInterface.class);
System.out.println(example.sayHello("rpc"));

long time = System.currentTimeMillis() - start;
System.out.println("time :" + time);
}






这里第一次调用耗时比较长需要46毫秒,akka会对消息进行优化,调用多次以后时间为 1~2毫秒。

目前还没来得及做性能测试,后面会补充。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  akka rpc java