使用Akka实现简易的spark通信框架 实战(二)
2018-03-05 08:51
489 查看
使用Akka实现简易的spark通信框架 实战(二)
1. 架构图
2. 代码实现
2.1 Master类
package cn.cheng.spark import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ //todo:利用akka实现简易版的spark通信框架-----Master端 class Master extends Actor{ //构造代码块先被执行 println("master constructor invoked") //定义一个map集合,用于存放worker信息 private val workerMap = new mutable.HashMap[String,WorkerInfo]() //定义一个list集合,用于存放WorkerInfo信息,方便后期按照worker上的资源进行排序 private val workerList = new ListBuffer[WorkerInfo] //master定时检查的时间间隔 val CHECK_OUT_TIME_INTERVAL=15000 //15秒 //prestart方法会在构造代码块执行后被调用,并且只被调用一次 override def preStart(): Unit = { println("preStart method invoked") //master定时检查超时的worker //需要手动导入隐式转换 import context.dispatcher context.system.scheduler.schedule(0 millis,CHECK_OUT_TIME_INTERVAL millis,self,CheckOutTime) } //receive方法会在prestart方法执行后被调用,表示不断的接受消息 override def receive: Receive = { //master接受worker的注册信息 case RegisterMessage(workerId,memory,cores) =>{ //判断当前worker是否已经注册 if(!workerMap.contains(workerId)){ //保存信息到map集合中 val workerInfo = new WorkerInfo(workerId,memory,cores) workerMap.put(workerId,workerInfo) //保存workerinfo到list集合中 workerList +=workerInfo //master反馈注册成功给worker sender ! RegisteredMessage(s"workerId:$workerId 注册成功") } } //master接受worker的心跳信息 case SendHeartBeat(workerId)=>{ //判断worker是否已经注册,master只接受已经注册过的worker的心跳信息 if(workerMap.contains(workerId)){ //获取workerinfo信息 val workerInfo: WorkerInfo = workerMap(workerId) //获取当前系统时间 val lastTime: Long = System.currentTimeMillis() workerInfo.lastHeartBeatTime=lastTime } } case CheckOutTime=>{ //过滤出超时的worker 判断逻辑: 获取当前系统时间 - worker上一次心跳时间 >master定时检查的时间间隔 val outTimeWorkers: ListBuffer[WorkerInfo] = workerList.filter(x => System.currentTimeMillis() -x.lastHeartBeatTime > CHECK_OUT_TIME_INTERVAL) //遍历超时的worker信息,然后移除掉超时的worker for(workerInfo <- outTimeWorkers){ //获取workerid val workerId: String = workerInfo.workerId //从map集合中移除掉超时的worker信息 workerMap.remove(workerId) //从list集合中移除掉超时的workerInfo信息 workerList -= workerInfo println("超时的workerId:" +workerId) } println("活着的worker总数:" + workerList.size) //master按照worker内存大小进行降序排列 println(workerList.sortBy(x => x.memory).reverse.toList) } } } object Master{ def main(args: Array[String]): Unit = { //master的ip地址 val host=args(0) //master的port端口 val port=args(1) //准备配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //配置config对象 利用ConfigFactory解析配置文件,获取配置信息 val config=ConfigFactory.parseString(configStr) // 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象 val masterActorSystem = ActorSystem("masterActorSystem",config) // 2、通过ActorSystem来创建master actor val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor") // 3、向master actor发送消息 //masterActor ! "connect" } }
2.2 Worker类
package cn.cheng.spark import java.util.UUID import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config 4000 .ConfigFactory import scala.concurrent.duration._ //todo:利用akka实现简易版的spark通信框架-----Worker端 class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{ println("Worker constructor invoked") //定义workerId private val workerId: String = UUID.randomUUID().toString //定义发送心跳的时间间隔 val SEND_HEART_HEAT_INTERVAL=10000 //10秒 //定义全局变量 var master: ActorSelection=_ //prestart方法会在构造代码块之后被调用,并且只会被调用一次 override def preStart(): Unit = { println("preStart method invoked") //获取master actor的引用 //ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor //调用对应actorSelection方法, // 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级 master= context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor") //向master发送注册信息,将信息封装在样例类中,主要包含:workerId,memory,cores master ! RegisterMessage(workerId,memory,cores) } //receive方法会在prestart方法执行后被调用,不断的接受消息 override def receive: Receive = { //worker接受master的反馈信息 case RegisteredMessage(message) =>{ println(message) //向master定期的发送心跳 //worker先自己给自己发送心跳 //需要手动导入隐式转换 import context.dispatcher context.system.scheduler.schedule(0 millis,SEND_HEART_HEAT_INTERVAL millis,self,HeartBeat) } //worker接受心跳 case HeartBeat =>{ //这个时候才是真正向master发送心跳 master ! SendHeartBeat(workerId) } } } object Worker{ def main(args: Array[String]): Unit = { //定义worker的IP地址 val host=args(0) //定义worker的端口 val port=args(1) //定义worker的内存 val memory=args(2).toInt //定义worker的核数 val cores=args(3).toInt //定义master的ip地址 val masterHost=args(4) //定义master的端口 val masterPort=args(5) //准备配置文件 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //通过configFactory来解析配置信息 val config=ConfigFactory.parseString(configStr) // 1、创建ActorSystem,它是整个进程中的老大,它负责创建和监督actor val workerActorSystem = ActorSystem("workerActorSystem",config) // 2、通过actorSystem来创建 worker actor val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores,masterHost,masterPort)),"workerActor") //向worker actor发送消息 workerActor ! "connect" } }
2.3 WorkerInfo类
package cn.cheng.spark //封装worker信息 class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) { //定义一个变量用于存放worker上一次心跳时间 var lastHeartBeatTime:Long=_ override def toString: String = { s"workerId:$workerId , memory:$memory , cores:$cores" } }
2.4 样例类
package cn.cheng.spark trait RemoteMessage extends Serializable{ } //worker向master发送注册信息,由于不在同一进程中,需要实现序列化 case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage //master反馈注册成功信息给worker,由于不在同一进程中,也需要实现序列化 case class RegisteredMessage(message:String) extends RemoteMessage //worker向worker发送心跳 由于在同一进程中,不需要实现序列化 case object HeartBeat //worker向master发送心跳,由于不在同一进程中,需要实现序列化 case class SendHeartBeat(val workerId:String) extends RemoteMessage //master自己向自己发送消息,由于在同一进程中,不需要实现序列化 case object CheckOutTime
喜欢就点赞评论+关注吧
感谢阅读,希望能帮助到大家,谢谢大家的支持!
相关文章推荐
- 使用Akka实现简易的spark通信框架 实战(一)
- Spark如何使用Akka实现进程、节点通信的简明介绍
- Spark如何使用Akka实现进程、节点通信的简明介绍
- Spark为何使用Netty通信框架替代Akka
- SPARK如何使用AKKA实现进程、节点通信
- Spark1.6之后为何使用Netty通信框架替代Akka
- Spark为何使用Netty通信框架替代Akka
- Spark如何使用Akka实现进程、节点通信的简明介绍
- Android中使用开源框架EventBus3.0实现Fragment之间的通信交互
- spark入门学习(1)---利用akka建立最基础的通信框架
- Android简易实战教程--第五十一话《使用Handler实现增加、减少、暂停计数》
- 用Go实现的简易TCP通信框架
- Android中使用开源框架EventBus3.0实现Fragment之间的通信交互
- 使用go reflect实现一套简易的rpc框架
- Android中使用开源框架EventBus3.0实现Fragment之间的通信交互
- Android简易实战教程--第五十一话《使用Handler实现增加、减少、暂停计数》
- Android中使用开源框架EventBus3.0实现Fragment之间的通信交互
- RxJAVA使用util包的类实现简易观察者与被观察者通信
- Go语言实现的简易TCP通信框架
- spark入门学习(2)---利用akka建立基于心跳基础的通信框架