您的位置:首页 > 其它

第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

2016-05-05 19:58 537 查看
Spark 是分布式计算框架,多台机器之间必然存在着通信。Spark在早期版本采用Akka实现。现在在Akka的上层抽象出了一个RpcEnv。RpcEnv负责管理机器之间的通信。
RpcEnv包含了如下三大核心:
RpcEndpoint 消息循环体,负责接收并处理消息。Spark中的Master、Worker都是RpcEndpoint 。

RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEndpointRef发送消息。

Dispatcher:消息调度器,负责RPC消息路由到适当的RpcEndpoint。

RpcEnv被创建以后,RpcEndpoint可以注册到RpcEnv中,被注册的RpcEndpoint会生成一个相应的RpcEndpointRef来引用它。如果你需要向RpcEndpoint发送消息,必须到RpcEnv中通过RpcEndpoint的名称来获取对应的RpcEndpointRef,然后通过RpcEndpointRef向RpcEndpoint发送消息。

RpcEnv负责管理RpcEndpoint的整个生命周期
注册RpcEndpoint,使用name或者uri

路由发送给RpcEndpoint的消息。

停止RpcEndpoint

注:一个RpcEndpoint只能注册给一个RpcEnv

RpcAddress:RpcEnv的逻辑地址,使用主机名和端口表示。
RpcEndpointAddress:注册到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name构成。

由此可见RpcEnv和RpcEndpoint是在相同的机器上(相同的JVM中)。而要想给远端机器发送消息,是获取远端机器的RpcEndpointRef,而并不是远端的RpcEndpoint注册到本地的RpcEnv中。

在Spark1.6版本中,默认使用的是netty

private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}


RpcEndpoint是一个消息循环体,它的生命周期:
构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)
receive():不断的运行,处理客户端发送过来的消息。
receiveAndReply():处理消息,并且回应对方。

我们看一下Master的代码:
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
//指定的主机名必须是start-master.sh脚本运行的本地机器名称
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}

/**
* Start the Master and return a three tuple of:
*   (1) The Master RpcEnv
*   (2) The web UI bound port
*   (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
//创建Rpc环境,主机名和端口就是Standalone集群的访问地址。SYSTEM_NAME=sparkMaster
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// 将Master实例注册到RpcEnv中
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
在main方法中创建了RpcEnv,并且实例化Master实例,然后注册到RpcEnv中。
RpcEndpoint其实是注册到Dispatcher中的,在netty中的代码实现如下:
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
注:NettyRpcEnv.scala的第135行

而Dispatcher中使用如下数据结构来存储RpcEndpoint和RpcEndpointRef
private val endpoints = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]


EndpointData为一个case class:
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}


在Master中使用数据结构WorkerInfo保存着每个Worker的信息,其中就包括每个Worker的RpcEndpointRef





备注:1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark RpcEnv