您的位置:首页 > 其它

Spark Broadcast源码分析

2016-03-25 09:01 330 查看
本博文的主要内容包括:

1、Broadcast功能描述

2、Broadcast创建过程

3、Broadcast读写原理

一、功能描述

Broadcast是指将数据从一个节点发送到其他节点,供其计算使用,是spark在计算过程中非常常用的方式,通常使用方式,包括共享配置文件,map数据集,树形数据结构等,为能够更好更快速为TASK任务使用相关变量。但是Broadcast不适合存放过大的数据,这会导致网络IO性能变差或者过重的单点压力。

Broadcast的基本用法:

      本文是借鉴网络大神的经验,结合自己的走读的一些总结,如有雷同之处,希望谅解!

二、创建过程

Broadcast是典型的建造者模式方法,相对内部设计相对较为简单,同时初始化并非直接创建Broadcast对象,作用有两个方面:

1)依据配置属性(spark.broadcast.factory)创建BroadcastFactory对象 - 反射创建。

2)将sparkConf对象注入Broadcast中,同时定义压缩编码。

初始化入口sparkContext启动时创建,调用过程如下:

1)SparkContext#构造方法

2)SparkEnv#create

3)BroadcastManager#initialize()

4)TorrentBroadcastFactoryr#initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)

5)TorrentBroadcast#initialize(_isDriver: Boolean, conf: SparkConf)

SparkContext初始化SparkEnv,在SparkEnv内创建BroadcastManager,代码如下:
SparkContext

// Create the Spark execution environment (cache, map output tracker, etc)
// 创建spark的执行环境
private[spark] val env = SparkEnv.create(
conf, // spark配置文件
"<driver>",
conf.get("spark.driver.host"), // 主机名
conf.get("spark.driver.port").toInt, // 端口号
isDriver = true, // 默认启动SparkContext客户端,便是Driver
isLocal = isLocal,// 是否是本地运行,是通过master获取该值,如果是submit提交,请参考SparkSubmitArguments类,会将参数转换为master
listenerBus = listenerBus
/* spark监听总线(LiveListenerBus),他是负责监听spark事件,包括job启动和介绍、BlockManage的添加等等,简单理解UI能看到的变化都是这块监听的,
* 如果有时间,可以将这块与大家分享一下,底层使用队列实现,典型观察者模式实现,未使用akka实现 */
)
SparkEnv.set(env) // 注册SparkEnv对象


SparkEnv中初始化BroadcastManager

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

val cacheManager = new CacheManager(blockManager)


BroadcastManager构造函数调用initialize方法构建

// Called by SparkContext or Executor before using Broadcast
// 一个context仅初始化一次,默认是Torrent
private def initialize() {
// TODO 初始化BroadcastFactory
// 1.确定仅有第一次进入时,创建BroadcastFactory对象
// 2.初始化BroadcastFactory,并与BroadcastManager建立hook

synchronized {
if (!initialized) {

val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")  //默认采用<span style="font-family: Arial, Helvetica, sans-serif;">TorrentBroadcastFactory</span>
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

//初始化BroadcastFactory,并与BroadcastManager建立hook
broadcastFactory.initialize(isDriver, conf, securityManager)
//表示第一次进入完毕
initialized = true
}
}
}


TorrentBroadcastFactory调用initialize方法

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}


将sparkConf对象注入Broadcast中,并定义压缩方式

/** 初始化TorrentBroadcast属性 */
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
compress = conf.getBoolean("spark.broadcast.compress", true)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}


broadcast是在sparkContext创建时完成的,broadcast类型、压缩方式也是在创建过程中完成的定义,但是,目前还无法实现app中不同job使用不同的broadcast,广播的方式只能选择TorrentBroadcast和HttpBroadcast的一种。spark默认使用TorrentBroadcast(并发),效率相对要比http要好,同时避免单机热点的产生,比较适合分布式系统的思想。思想类似于迅雷BT下载,已使用的executor越多,速度越快。

Broadcast创建

driver首先要将值序列化到byteArray中,然后再按block大小进行分割(默认是4M),将信息存放在driver的blockmanage中,并通知BlockManageMaster,完成注册,并可以让所有executor读取,存储方式MEMORY_AND_DISK。

使用write顺序:

1)SparkContext#broadcast  外层方法,使用sc.broadcast 进行广播

2) BroadcastManager#newBroadcast(value_ : T, isLocal: Boolean)

3)TorrentBroadcastFactory#newBroadcast(value_ : T, isLocal: Boolean, id:Long)

4)TorrentBroadcast#构造函数

5)TorrentBroadcast#writeBlocks

6)BlockManage#putBytes(

      blockId: BlockId,

      bytes: ByteBuffer,

      level: StorageLevel,

      tellMaster: Boolean = true,

      effectiveStorageLevel:Option[StorageLevel] = None)   最终存储

当然,使用广播较为简单,但是,如果sparkContext长时间执行多个job时,则考虑注销广播,或者尽量广播要小,否则会造成性能严重下降,具体原因尚未研究。

注销方式代码如下:

val broadcastValue = sc.broadcast(存储值)
broadcastValue.unpersist() //方法一
SparkEnv.get.broadcastManager.unbroadcast(id, false, false) //方法二

创建时,使用SparkContext的broadcast方法,并将值一直传递至TorrentBroadcast,并构建TorrentBroadcast对象,同时完成将值交给BlockManage进行注册,并序列化在本地存储。(SparkEnv.get.blockManager.putBytes方法)
TorrentBroadcast

private[spark] class TorrentBroadcast[T: ClassTag](
obj : T,
@transient private val isLocal: Boolean,
id: Long)
extends Broadcast[T](id) with Logging with Serializable {

/** 1.driver是直接读取本地的值
* 2.其他executor是依靠blockManager读取(readObject) */
@transient private var _value: T = obj

/* 固定格式:
* broadcastId = broadcast_广播ID
* blockID = broadcast_广播ID_piece[1,2,3,4] */
private val broadcastId = BroadcastBlockId(id)

/** 1.广播值交给blockManager管理
* 2.广播转换为ByteArray,返回数据块的长度 */
private val numBlocks: Int = writeBlocks()

override protected def getValue() = _value
}


writeBlocks是主要执行写方法,主要功能便是按照定义的广播块大小切分数据(默认是4M,spark.broadcast.blockSize),其后将块注册blockManage,并写入本地磁盘中。

writeBlocks(){

   1.blockifyObject  数据切分方法

   2.BlockManage.putBytes  数据存储方法

}
blockifyObject  代码如下:

/** 切分数据,方法较为实用,可作为工具类
* @param obj 切分数据对象 */
def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
// TODO: Create a special ByteArrayOutputStream that splits the output directly into chunks
// so we don't need to do the extra memory copy.
// TODO 数据切块,按照默认的4M切分数据块,返回4MByteBuffer(数据体检变小)
// 数据 -> 压缩 -> 序列化 -> 分割
// 1. 声明输出流(定义压缩方式和序列化)
// 2. 压缩后数据按4M进行分割
// 3. 返回ByteBuffer字符

// 1.0 定义输出流
val bos = new ByteArrayOutputStream()
// 1.1 包装压缩方式
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
// 1.2 创建序列化对象
val ser = SparkEnv.get.serializer.newInstance()
// 1.3 包装序列化输出流(默认java序列化,不过一般推荐KryoSerializer,建议修改spark-defaults.conf)
val serOut = ser.serializeStream(out)
// 1.4 将value写至ByteArray中
serOut.writeObject[T](obj).close()
val byteArray = bos.toByteArray
// 2.0 将ByteArray转换为输入流
val bais = new ByteArrayInputStream(byteArray)
// 2.1 获取分割块数,ceil有余数+1
val numBlocks = math.ceil(byteArray.length.toDouble / BLOCK_SIZE).toInt
// 2.2 定义数据块集合
val blocks = new Array[ByteBuffer](numBlocks)
// 2.3 定义块ID
var blockId = 0
// 2.4 循环按4M分割数据块,步长为4M
for (i <- 0 until (byteArray.length, BLOCK_SIZE)) {
// 2.4.1 定义装载4M的byte的容器
val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i)
val tempByteArray = new Array[Byte](thisBlockSize)
// 2.4.2 装载数据
bais.read(tempByteArray, 0, thisBlockSize)
blocks(blockId) = ByteBuffer.wrap(tempByteArray)
// 2.4.3 index加一
blockId += 1
}
// 3.0 切分结束,关闭流
bais.close()
// 3.1 返回流
blocks
}

Broadcast读取
broadcase写入是优先写入依据存储策略写入本地(BlockManage#putBytes方法),既然序列化数据是本地存储,由此而来的问题是读取问题,BlockManage存储数据并不似hdfs会依据备份策略存储多份数据放置不同节点(但是多提一句,spark的taskScheblue是拥有类似机架感知策略分配任务),如没有备份数据,那么必然产生一下数个问题:

   1.节点故障,无法访问节点数据

   2.数据热点,所有任务皆使用该数据

   3.网络传输,所有节点频繁访问单节点

那么解决该问题,spark并没有使用HDFS的思想,而选择是P2P点对点方式(BT下载)解决问题,是只要使用过broadcase数据,则在本接节点存储数据,由此变成新的数据源,随和数据源不断增加速度也会越来越快,刚开始传输则相对会慢一些,同时,以上不建议使用大文件broadcase,亦是如此,如果使用较为频繁的数据,他相当于每个节点都要存储一份,形成网状传输方式交换数据,因此建议存储配置文件或某种数据结构为上佳选择。

 

调用顺序:

1)TorrentBroadcast#readObject()

2)TorrentBroadcast#readBlocks()

3)BlockManage#getLocalBytes(blockId:BlockId) / getRemoteBytes(blockId: BlockId)

4)BlockManage#putBytes()

readObject是broadcase读取的主方法,管理整个读取策略

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
// TODO 读取广播变量,有便读取本地,没有则远程并存储在本地

// 1.0 可读取对象中静态变量
in.defaultReadObject()
// 2.0 读取广播变量(单个executor独享)
TorrentBroadcast.synchronized {
// 2.1 读取本地广播数据
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
// 2.2 获取本地数据成功
case Some(x) =>
_value = x.asInstanceOf[T]
// 2.3 获取本地数据失败
case None =>
// 2.4 获取Blocks,同时将块存储到本地
logInfo("启动读取 broadcast variable " + id)
val start = System.nanoTime()
val blocks = readBlocks()
val time = (System.nanoTime() - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")

// 2.5 将数据块反序列化,并解压缩
_value = TorrentBroadcast.unBlockifyObject[T](blocks)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
}
}

readBlocks则是实现P2P思想的具体实现者,代码如下:
/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[ByteBuffer] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these chunks from this executor as well.

// 1.0 定义数据块集合
val blocks = new Array[ByteBuffer](numBlocks)
// 1.1 引用blockManager
val bm = SparkEnv.get.blockManager

// 2.0 循环遍历所有块,避免访问热点,随机顺序读
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
// 2.1 组装块ID
val pieceId = BroadcastBlockId(id, "piece" + pid)

// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
// 2.2 他会先查本地,继而查询远程,但是前面已经查找的是广播,现在查找的是认数据块(区别)
var blockOpt = bm.getLocalBytes(pieceId)
// 2.3 如果本地为查询到结果,则通过blockManager远程获取,并将数据存储到本地
if (!blockOpt.isDefined) {
blockOpt = bm.getRemoteBytes(pieceId)
blockOpt match {
case Some(block) =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}
// If we get here, the option is defined.
// 3.0 赋值数据块集合
blocks(pid) = blockOpt.get
}
// 3.1 返回数据块
blocks
}

相关配置属性说明:(在spark-default.conf中设置)

spark.broadcast.factory 定义使用http或Torrent方式,默认是Torrent,无需修改

spark.broadcast.blockSize 数据库块大小,blockifyObject依据此属性切分数据块,默认4M

spark.broadcast.compress 是否压缩,默认是使用,sparkcontext初始化该属性,无需修改。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: