spark源码分析--Master和worker建立连接
2015-07-31 23:15
489 查看
Spark的master启动后,等待work通过spark://master'ip:7077的url去连接Master.
在worker的回调函数preStart(Worker.scala)里面,调用了函数connectToMaster,这个函数完成了向Master节点注册work的工作。执行的方法是向master发送一个RegisterWorker消息
Java代码
master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
再来看Master.scala
在这个类的recieve函数里,我们可以看到当Master收到RegisterWorker消息后如何处理
Java代码
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
........
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {//如果idToWorker里面没有,成功注册
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
........
sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
}
如果idToWorker这个hashmap里面,已经存在了相同的id,给发出请求的worker,发送RegisterWorkerFailed消息。如果不存在相同的id,执行addWorker操作后,向发出消息的worker,发送RegisteredWorker消息。之后调用schedule函数,进行job的重新分配
再回到Worker.scala,看worker收到RegisteredWorker消息后的动作
Java代码
case RegisteredWorker(url) =>
.......
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
很简单,就以HEARTBEAT_MILLIS (默认是15秒(15000毫秒) --System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4)为时间间隔,定期向master发送心跳,
而master 每隔WORKER_TIMEOUT(默认60秒(60000毫秒) val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000),检查一次超时。发送CheckForWorkerTimeOut消息给自己(也就是master),收到这个消息后,调用timeOutDeadWorkers清理超过WORKER_TIMEOUT时间间隔,仍未收到心跳的worker
相关文章推荐
- 【JAVA】浅谈java内部类
- Xamarin.Forms 优秀UI界面
- 7.31 Django学习第三章
- day02_变量_基本数据类型_数据类型的转换_Scanner_20150731
- HDU - 2066 一个人的旅行
- linux性能评测工具2-Google perftools
- Struts2 零配置(一)—配置文件 vs 零配置
- MAC 设置环境变量path的几种方法
- java中使用日志组件和mysql数据库
- 弗洛伊德算法
- maven的使用和入门
- Mysql字符集以及校对规则
- excel导入MySQL 学习
- 用手机调试Android手机连上没反应解决办法
- 云存储基础架构剖析
- [V1.0]小木虫路径搜索问题C语言处理方案
- 神经网络-并行BP算法
- oracle-循环
- 没必要对央行网络支付办法恐慌
- 【设计前沿】初级网页设计师需要注意事项