您的位置:首页 > 其它

spark 2.1 BlockManagerId

2017-05-11 14:19 447 查看
When block manager is created, it will generate a id without topology information.

and register itself with the block manager master.

the block manager master will return a id, and it will use this id if the id is not null.

val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

val idFromMaster = master.registerBlockManager(
id,
maxMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id


BlockManagerMasterEndpoint.register method

private def register(
idWithoutTopologyInfo: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxMemSize), id))

blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
id
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: