您的位置:首页 > 其它

Scala基于Akka的Remote Actor实现的简单RPC

2017-03-13 15:49 417 查看
spark 1.3中的通信是基于Akka实现的,Actor之间的交互都是通过消息,并且所有动作都是异步的。

本文基于spark 1.3通信核心原理实现一个简单的基于akka的rpc框架。

服务端:Server

//模式匹配 消息类型

case class AkkaMessage(message: Any)

case class Response(response: Any)

class Server extends Actor {

override def receive: Receive = {

case msg:AkkaMessage=>{

println(“服务端收到消息:”+msg.message)

sender ! Response(“response_” + msg.message)

}

case _ => println(“服务端不支持的消息类型 .. “)

}

}

object Server {

//创建远程Actor:ServerSystem

def main(args: Array[String]): Unit = {

val serverSystem = ActorSystem(“mxb”,ConfigFactory.parseString(“””

akka {

actor {

provider = “akka.remote.RemoteActorRefProvider”

}

remote {

enabled-transports = [“akka.remote.netty.tcp”]

netty.tcp {

hostname = “127.0.0.1”

port = 2555

}

}

}

“”“))

serverSystem.actorOf(Props[Server], “server”)

}

}

客户端Client:

class Client extends Actor {

//远程Actor

var remoteActor : ActorSelection = null

//当前Actor

var localActor : akka.actor.ActorRef = null

@throwsException

override def preStart(): Unit = {

remoteActor = context.actorSelection(“akka.tcp://mxb@127.0.0.1:2555/user/server”)

println(“远程服务端地址 : ” + remoteActor)

}

override def receive: Receive = {

//接收到消息类型为AkkaMessage后,将消息转发至远程Actor

case msg: AkkaMessage => {

println(“客户端发送消息 : ” + msg)

this.localActor = sender()

remoteActor ! msg

}

//接收到远程Actor发送的消息类型为Response,响应

case res: Response => {

localActor ! res

}

case _ => println(“客户端不支持的消息类型 .. “)

}

}

object Client {

def main(args: Array[String]) : Unit = {

val clientSystem = ActorSystem(“ClientSystem”, ConfigFactory.parseString(“””

akka {

actor {

provider = “akka.remote.RemoteActorRefProvider”

}

}

“”“))

var client = clientSystem.actorOf(Props[Client])
var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))

implicit val timeout = Timeout(3 seconds)

msgs.foreach { x =>
val future = client ? x
val result = Await.result(future,timeout.duration).asInstanceOf[Response]
println("收到的反馈: " + result)
}

//     msgs.foreach { x =>
//       client ! x
//     }

clientSystem.shutdown()


}

}

运行结果:

server console:

[INFO] [03/13/2017 15:38:03.154] [main] [Remoting] Starting remoting

[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://mxb@127.0.0.1:2555]

[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://mxb@127.0.0.1:2555]

服务端收到消息:message1

服务端收到消息:message2

服务端收到消息:message3

服务端收到消息:message4

[ERROR] [03/13/2017 15:38:49.113] [mxb-akka.remote.default-remote-dispatcher-6] [akka.tcp://mxb@127.0.0.1:2555/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClientSystem%4010.60.98.79%3A2552-0/endpointWriter] AssociationError [akka.tcp://mxb@127.0.0.1:2555] <- [akka.tcp://ClientSystem@10.60.98.79:2552]: Error [Shut down address: akka.tcp://ClientSystem@10.60.98.79:2552] [

akka.remote.ShutDownAssociation: Shut down address: akka.tcp://ClientSystem@10.60.98.79:2552

Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.

]

client console:

[INFO] [03/13/2017 15:38:48.409] [main] [Remoting] Starting remoting

[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClientSystem@10.60.98.79:2552]

[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ClientSystem@10.60.98.79:2552]

远程服务端地址 : ActorSelection[Anchor(akka.tcp://mxb@127.0.0.1:2555/), Path(/user/server)]

客户端发送消息 : AkkaMessage(message1)

收到的反馈: Response(response_message1)

客户端发送消息 : AkkaMessage(message2)

收到的反馈: Response(response_message2)

客户端发送
8ce1
消息 : AkkaMessage(message3)

收到的反馈: Response(response_message3)

客户端发送消息 : AkkaMessage(message4)

收到的反馈: Response(response_message4)

[INFO] [03/13/2017 15:38:49.035] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Shutting down remote daemon.

[INFO] [03/13/2017 15:38:49.050] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.

[INFO] [03/13/2017 15:38:49.128] [ForkJoinPool-3-worker-15] [Remoting] Remoting shut down

[INFO] [03/13/2017 15:38:49.128] [ClientSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Remoting shut down.

Process finished with exit code 0
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: