使用Akka实现一个简单的RPC框架(二)
2017-11-30 13:50
911 查看
需求以及实现思路如下
至于Master与Worker的编写参考使用Akka实现简单rpc(一)
1、Master类
2、Worker类
3、WorkerInfo类
4、样例类
分别指定两个主类,指定主类为
时,将打的包放到windows的D盘下,命令行输入
可以发现每隔15秒回检查一次,打印存活的worker数。
再指定主类为
时,打包上传到两台linux机器
2机器命令行输入
mini1 机器
mini2机器
发现两台机器每隔10秒就向master发送心跳(报活)
再去window是上命令行查看
至于Master与Worker的编写参考使用Akka实现简单rpc(一)
1、Master类
/** * Created by 12706 on 2017/11/29. */ class Master(val host : String, val port : Int) extends Actor{ //key为workerId,value为worker的信息 val regWorkersMap = new mutable.HashMap[String,WorkerInfo]() val regWorkersSet = new mutable.HashSet[WorkerInfo]() //定时检查worker是否超时,时间15秒(即15秒检查一次),一定要大于worker发送心跳的间隔 val CHECK_WORKER_TIMEOUT : Long = 15000 //初始化方法 override def preStart(): Unit = { //定时检查worker,如果超时那么排除掉该worker,导入隐士转换与import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 millis, CHECK_WORKER_TIMEOUT millis, self ,CheckWorker) } //不停地接收消息 override def receive: Receive = { //接收到worker的注册消息 case RegisterWorker(workerId,memory,cores) => { if (!regWorkersMap.contains(workerId)){ //该worker还没注册,将该worker信息保存(保存到map和set中)起来 val workerInfo = new WorkerInfo(workerId,memory,cores) regWorkersMap.put(workerId,workerInfo) regWorkersSet.add(workerInfo) //向worker返回注册成功信息,需要告诉worker该master的地址 sender ! RegisteredSucceed(s"akka.tcp://MasterSystem@$host:$port/user/master") } } //接收到worker发送的心跳信息,重新设置该worker的最后报活时间 case HeartBeat(workerId) => { val currentTime = System.currentTimeMillis() val workerInfo = regWorkersMap(workerId) //修改map中的workerInfo时,由于set中对应的那个workerInfo与map中的都是指向同一内存单元的,所以值也等于发生了改变 workerInfo.lastBeatTime = currentTime } case CheckWorker => { val currentTime = System.currentTimeMillis() //最后一次报活时间到现在的时间间隔超过了15秒的worker是集合 val toRemoveWorkers = regWorkersSet.filter(worker => currentTime - worker.lastBeatTime > CHECK_WORKER_TIMEOUT) for (worker <- toRemoveWorkers) { //超过的这些worker是全部移除 regWorkersSet.remove(worker) regWorkersMap.remove(worker.workerId) } println("存活的workers个数:" + regWorkersSet.size) } } } object Master { def main(args: Array[String]): Unit = { //连接地址和端口号 val host = args(0) val port = args(1).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //传入配置参数获取配置 val config = ConfigFactory.parseString(configStr) //获取ActorSystem,指定名称和配置 val masterSystem = ActorSystem("MasterSystem",config) //创建Actor val master = masterSystem.actorOf(Props(new Master(host,port)),"master") masterSystem.awaitTermination() } }
2、Worker类
class Worker(val masterHost : String,val masterPort : Int, val memory : Int, val cores : Int) extends Actor{ var master : ActorSelection =_ val workerId : String = UUID.randomUUID().toString.replace("-","") //向Master发送心跳的间隔10s val SEND_HEARTBEAT_INTERVAL : Long = 10000 override def preStart(): Unit = { //与Master建立连接,拿到master引用 master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/master") //向Master发送注册消息 master ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { //收到Master返回的注册成功信息 case RegisteredSucceed(masterUrl) => { //定时向Master发送心跳,用来报活 // 需要导入隐式转换,还需要导入包scala.concurrent.duration._ import context.dispatcher //以上拿到的master引用的类型跟这里第三个参数指定的发送目标类型不一致,所以发送给worker自己 //第二个参数为发送时间间隔,最后一个是发送的信息,发送给自己的话还可以自己做一些信息检查再发送给Master context.system.scheduler.schedule(0 millis,SEND_HEARTBEAT_INTERVAL millis,self,CheckWorkerSelf) } case CheckWorkerSelf => { println(s"$workerId:worker向master报活") master ! HeartBeat(workerId) } case "reply" => { println("收到Master消息") } } } object Worker { def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //传入配置参数获取配置 val config = ConfigFactory.parseString(configStr) //获取ActorSystem,指定名称和配置 val workerSystem = ActorSystem("WorkerSystem",config) //创建Actor val actor = workerSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"worker") workerSystem.awaitTermination() } }
3、WorkerInfo类
/** * 封装了worker的信息类 * Created by 12706 on 2017/11/30. */ class WorkerInfo(val workerId : String,val memory : Int, val cores : Int) { var lastBeatTime : Long = _ }
4、样例类
//需要进行网络传输 trait RemoteMessage extends Serializable //Worker想Master发送的注册消息 workerId,内存,核数,由于需要进行网络传输所以需要继承RemoteMessage case class RegisterWorker(val workerId : String, val memory : Int, val cores : Int) extends RemoteMessage //Master想Worker发送的注册成功消息 case class RegisteredSucceed(val masterUrl : String) case object CheckWorkerSelf //Worker想Master发送心跳 case class HeartBeat(val workerId : String) extends RemoteMessage case object CheckWorker
分别指定两个主类,指定主类为
<mainClass>cn.hfut.rpc.Master</mainClass>
时,将打的包放到windows的D盘下,命令行输入
D:\>java -jar my-akka-2.0.jar 192.168.191.2 8080 [INFO] [11/30/2017 13:24:19.554] [main] [Remoting] Starting remoting [INFO] [11/30/2017 13:24:19.975] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://MasterSystem@192.168.191.2:8080] [INFO] [11/30/2017 13:24:19.977] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://MasterSystem@192.168.191.2:8080] 存活的workers个数:0 存活的workers个数:0 存活的workers个数:0 存活的workers个数:0
可以发现每隔15秒回检查一次,打印存活的worker数。
再指定主类为
<mainClass>cn.hfut.rpc.Worker</mainClass>
时,打包上传到两台linux机器
2机器命令行输入
mini1 机器
[root@mini1 ~]# java -jar my-akka-2.0.jar mini1 8081 192.168.191.2 8080 102400 4 [INFO] [11/30/2017 21:27:51.136] [main] [Remoting] Starting remoting [INFO] [11/30/2017 21:27:51.352] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@mini1:8081] [INFO] [11/30/2017 21:27:51.353] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@mini1:8081] 4e9b8e9bfc0e44e590517831b4ce7d29:worker向master报活 4e9b8e9bfc0e44e590517831b4ce7d29:worker向master报活
mini2机器
[root@mini2 ~]# java -jar my-akka-2.0.jar mini2 8082 192.168.191.2 8080 102400 4 [INFO] [11/23/2017 17:47:56.166] [main] [Remoting] Starting remoting [INFO] [11/23/2017 17:47:56.399] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@mini2:8082] [INFO] [11/23/2017 17:47:56.399] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@mini2:8082] 0e33270b6fa843b684088d7125fe0ca5:worker向master报活 0e33270b6fa843b684088d7125fe0ca5:worker向master报活
发现两台机器每隔10秒就向master发送心跳(报活)
再去window是上命令行查看
存活的workers个数:1 存活的workers个数:1 存活的workers个数:2 存活的workers个数:2
相关文章推荐
- 使用Akka实现一个简单的RPC框架(一)
- 基于akka与scala实现一个简单rpc框架
- 【远程调用框架】如何实现一个简单的RPC框架(二)实现与使用
- 一个简单的rpc框架实现(待连载优化)
- php实现的一个简单json rpc框架实例
- Java实现一个简单的RPC框架(六) 注册机制
- Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架
- 一个简单的rpc框架实现(一)
- 一个简单的rpc框架的实现
- Java实现一个简单的RPC框架(五) 基于Socket的传输层实现
- 使用CXF框架实现webservice的一个简单例子
- 【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通信框架
- 一个简单RPC框架是如何炼成的(III)——实现带参数的RPC调用
- 【远程调用框架】如何实现一个简单的RPC框架(五)优化三:软负载中心设计与实现
- 【远程调用框架】如何实现一个简单的RPC框架(一)想法与设计
- 使用Python的Tornado框架实现一个简单的WebQQ机器人
- Java实现一个简单的RPC框架(二) 协议
- 使用Python的Twisted框架实现一个简单的服务器
- Java实现一个简单的RPC框架(四) 编码和解码
- 一个简单的rpc框架的实现