您的位置:首页 > 其它

使用Akka实现一个简单的RPC框架(二)

2017-11-30 13:50 911 查看
需求以及实现思路如下



至于Master与Worker的编写参考使用Akka实现简单rpc(一)

1、Master类

/**
* Created by 12706 on 2017/11/29.
*/
class Master(val host : String, val port : Int) extends Actor{
//key为workerId,value为worker的信息
val regWorkersMap = new mutable.HashMap[String,WorkerInfo]()

val regWorkersSet = new mutable.HashSet[WorkerInfo]()

//定时检查worker是否超时,时间15秒(即15秒检查一次),一定要大于worker发送心跳的间隔
val CHECK_WORKER_TIMEOUT : Long = 15000

//初始化方法
override def preStart(): Unit = {
//定时检查worker,如果超时那么排除掉该worker,导入隐士转换与import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(0 millis, CHECK_WORKER_TIMEOUT millis, self ,CheckWorker)
}

//不停地接收消息
override def receive: Receive = {
//接收到worker的注册消息
case RegisterWorker(workerId,memory,cores) => {
if (!regWorkersMap.contains(workerId)){
//该worker还没注册,将该worker信息保存(保存到map和set中)起来
val workerInfo = new WorkerInfo(workerId,memory,cores)
regWorkersMap.put(workerId,workerInfo)
regWorkersSet.add(workerInfo)
//向worker返回注册成功信息,需要告诉worker该master的地址
sender ! RegisteredSucceed(s"akka.tcp://MasterSystem@$host:$port/user/master")
}
}
//接收到worker发送的心跳信息,重新设置该worker的最后报活时间
case HeartBeat(workerId) => {
val currentTime = System.currentTimeMillis()
val workerInfo = regWorkersMap(workerId)
//修改map中的workerInfo时,由于set中对应的那个workerInfo与map中的都是指向同一内存单元的,所以值也等于发生了改变
workerInfo.lastBeatTime = currentTime
}
case CheckWorker => {
val currentTime = System.currentTimeMillis()
//最后一次报活时间到现在的时间间隔超过了15秒的worker是集合
val toRemoveWorkers = regWorkersSet.filter(worker => currentTime - worker.lastBeatTime > CHECK_WORKER_TIMEOUT)
for (worker <- toRemoveWorkers) {
//超过的这些worker是全部移除
regWorkersSet.remove(worker)
regWorkersMap.remove(worker.workerId)
}
println("存活的workers个数:" + regWorkersSet.size)
}
}

}

object Master {
def main(args: Array[String]): Unit = {

//连接地址和端口号
val host = args(0)
val port = args(1).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//传入配置参数获取配置
val config = ConfigFactory.parseString(configStr)
//获取ActorSystem,指定名称和配置
val masterSystem = ActorSystem("MasterSystem",config)
//创建Actor
val master = masterSystem.actorOf(Props(new Master(host,port)),"master")
masterSystem.awaitTermination()
}
}


2、Worker类

class Worker(val masterHost : String,val masterPort : Int, val memory : Int, val cores : Int) extends Actor{

var master : ActorSelection =_
val workerId : String = UUID.randomUUID().toString.replace("-","")
//向Master发送心跳的间隔10s
val SEND_HEARTBEAT_INTERVAL : Long = 10000

override def preStart(): Unit = {
//与Master建立连接,拿到master引用
master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/master")
//向Master发送注册消息
master ! RegisterWorker(workerId,memory,cores)
}

override def receive: Receive = {
//收到Master返回的注册成功信息
case RegisteredSucceed(masterUrl) => {
//定时向Master发送心跳,用来报活
// 需要导入隐式转换,还需要导入包scala.concurrent.duration._
import context.dispatcher
//以上拿到的master引用的类型跟这里第三个参数指定的发送目标类型不一致,所以发送给worker自己
//第二个参数为发送时间间隔,最后一个是发送的信息,发送给自己的话还可以自己做一些信息检查再发送给Master
context.system.scheduler.schedule(0 millis,SEND_HEARTBEAT_INTERVAL millis,self,CheckWorkerSelf)
}
case CheckWorkerSelf => {
println(s"$workerId:worker向master报活")
master ! HeartBeat(workerId)
}

case "reply" => {
println("收到Master消息")
}
}
}

object Worker {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val masterHost = args(2)
val masterPort = args(3).toInt
val memory = args(4).toInt
val cores = args(5).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//传入配置参数获取配置
val config = ConfigFactory.parseString(configStr)
//获取ActorSystem,指定名称和配置
val workerSystem = ActorSystem("WorkerSystem",config)
//创建Actor
val actor = workerSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"worker")
workerSystem.awaitTermination()
}
}


3、WorkerInfo类

/**
* 封装了worker的信息类
* Created by 12706 on 2017/11/30.
*/
class WorkerInfo(val workerId : String,val memory : Int, val cores : Int) {
var lastBeatTime : Long = _
}


4、样例类

//需要进行网络传输
trait RemoteMessage extends Serializable

//Worker想Master发送的注册消息 workerId,内存,核数,由于需要进行网络传输所以需要继承RemoteMessage
case class RegisterWorker(val workerId : String, val memory : Int, val cores : Int) extends RemoteMessage

//Master想Worker发送的注册成功消息
case class RegisteredSucceed(val masterUrl : String)

case object CheckWorkerSelf

//Worker想Master发送心跳
case class HeartBeat(val workerId : String) extends RemoteMessage

case object CheckWorker


分别指定两个主类,指定主类为

<mainClass>cn.hfut.rpc.Master</mainClass>


时,将打的包放到windows的D盘下,命令行输入

D:\>java -jar my-akka-2.0.jar 192.168.191.2 8080
[INFO] [11/30/2017 13:24:19.554] [main] [Remoting] Starting remoting
[INFO] [11/30/2017 13:24:19.975] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://MasterSystem@192.168.191.2:8080]
[INFO] [11/30/2017 13:24:19.977] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://MasterSystem@192.168.191.2:8080]
存活的workers个数:0
存活的workers个数:0
存活的workers个数:0
存活的workers个数:0


可以发现每隔15秒回检查一次,打印存活的worker数。

再指定主类为

<mainClass>cn.hfut.rpc.Worker</mainClass>


时,打包上传到两台linux机器

2机器命令行输入

mini1 机器

[root@mini1 ~]# java -jar my-akka-2.0.jar mini1 8081 192.168.191.2 8080 102400 4
[INFO] [11/30/2017 21:27:51.136] [main] [Remoting] Starting remoting
[INFO] [11/30/2017 21:27:51.352] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@mini1:8081]
[INFO] [11/30/2017 21:27:51.353] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@mini1:8081]
4e9b8e9bfc0e44e590517831b4ce7d29:worker向master报活
4e9b8e9bfc0e44e590517831b4ce7d29:worker向master报活


mini2机器

[root@mini2 ~]#  java -jar my-akka-2.0.jar mini2 8082 192.168.191.2 8080 102400 4
[INFO] [11/23/2017 17:47:56.166] [main] [Remoting] Starting remoting
[INFO] [11/23/2017 17:47:56.399] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://WorkerSystem@mini2:8082]
[INFO] [11/23/2017 17:47:56.399] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://WorkerSystem@mini2:8082]
0e33270b6fa843b684088d7125fe0ca5:worker向master报活
0e33270b6fa843b684088d7125fe0ca5:worker向master报活


发现两台机器每隔10秒就向master发送心跳(报活)

再去window是上命令行查看

存活的workers个数:1
存活的workers个数:1
存活的workers个数:2
存活的workers个数:2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: