您的位置:首页 > 其它

第42课: Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

2017-06-04 10:39 656 查看
第42课:  Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

Broadcast在机器学习、图计算、构建日常的各种算法中到处可见。 Broadcast就是将数据从一个节点发送到其它的节点上;例如Driver上有一张表,而Executor中的每个并行执行的Task(100万个Task)都要查询这张表,那我们通过Broadcast的方式就只需要往每个Executor把这张表发送一次就行了,Executor中的每个运行的Task查询这张唯一的表,而不是每次执行的时候都从Driver获得这张表!
JAVA中的Servlet里面有个ServletContext,是JSP或Java代码运行时的上下文,通过上下文可以获取各种资源。Broadcast类似于ServletContext中的资源、变量或数据,Broadcast广播出去是基于Executor的,里面的每个任务可以用上下文,Task的上下文就是Executor,可以抓取数据。这就好像ServletContext的具体作用,只是Broadcast是分布式的共享数据,默认情况下只要程序在运行Broadcast变量就会存在,因为Broadcast在底层是通过BlockManager管理的!但是你可以手动指定或者配置具体周期来销毁Broadcast变量!可以指定Broadcast的unpersist销毁Broadcast变量,因为Spark应用程序中可能运行很多job,可能一个job需要很多Broadcast变量,但下一个job不需要这些变量,但是应用程序还存在,因此需手工销毁Broadcast变量。
Broadcast一般用于处理共享配置文件、通用的Dataset、常用的数据结构等等;但是不适合存放太大的数据在Broadcast,Broadcast不会内存溢出,因为其数据的保存的StorageLevel是MEMORY_AND_DISK的方式;虽然如此,我们也不可以放入太大的数据在Broadcast中,因为网络IO和可能的单点压力会非常大!(Spark 1.6版本Broadcast有两种方式: HttpBroadcast, TorrentBroadcast。  HttpBroadcast可能有单点压力;TorrentBroadcast下载没有单点压力但可能有网络压力。)但在Spark 2.0版本中已经去掉HTTPBroadcast(SPARK-12588),Spark 2.0版本中TorrentBroadcast是Broadcast唯一的广播实现方式。
广播Broadcast变量是只读变量,如果Broadcast不是只读变量而可以更新,那带来的问题:1,一个节点上Broadcast可以更新,其它的节点Broadcast也要更新2,如果多个的节点Broadcast同时更新,如何确定更新的顺序,以及容错等内容。因此广播Broadcast变量是只读变量,最为轻松保持了数据的一致性!
Broadcast 广播变量是只读变量,缓存在每一个节点上,而不是每个Task去获取它的一份复制副本。例如,以高效的方式给每个节点发送一个dataset的副本。Spark尝试在分布式发送广播变量时使用高效的广播算法减少通信的成本。
广播变量是由一个变量“V”通过调用[[org.apache.spark.SparkContext#broadcast]]创建的。广播变量是一个围绕“V”的包装器,它的值可以通过调用 `value`方法来获取。例如:
1.           scala> val broadcastVar =sc.broadcast(Array(1, 2, 3))
2.           broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
3.          
4.           scala> broadcastVar.value
5.           res0: Array[Int] = Array(1, 2, 3)
如果要更新广播变量,只有再广播一次,那就是一个新的广播变量,使用一个新的广播变量ID。
广播变量创建后,在群集上运行的时候“V”变量不是在任何函数都使用,以便“V”传送到节点时不止一次。此外,对象“V”不应该被修改,是为了确保广播所有节点得到相同的广播变量值(例如,如果变量被发送到后来的一个新节点)。
Broadcast的源码:
1.          @param id 广播变量的唯一标识符。
2.          @tparam T   广播变量的数据类型。
3.          abstract class Broadcast[T: ClassTag](val id:Long) extends Serializable with Logging {
4.          
5.           @volatile private var _isValid = true
6.          
7.           private var _destroySite = ""
8.          
9.           /** Get the broadcasted value. */
10.        def value: T = {
11.          assertValid()
12.          getValue()
13.        }
14.      ......
 
Spark 1.6版本中的HttpBroadcast方式的Broadcast,最开始的时候数据放在Driver的本地文件系统中,Driver在本地会创建一个文件夹来存放Broadcast中的data,然后启动HttpServer来访问文件夹中的数据,同时写入到BlockManager(StorageLevel是MEMORY_AND_DISK)中获得BlockId(BroadcastBlockId),当第一次Eexcutor中的Task要访问Broadcast变量的时候,会向Driver通过HttpServer来访问数据,然后会在Executor中的BlockManager中注册该Broadcast中的数据BlockManager,这样后需要的Task需要访问Broadcast的变量的时候会首先查询BlockManager中有没有该数据,如果有就直接使用;(说明SPARK-12588,HTTPBroadcast方式在Spark 2.0版本中已经去掉。)。
BroadcastManager是用来管理Broadcast,该实例对象是在SparkContext创建SparkEnv的时候创建的:
SparkEnv.scala源码:
1.            val broadcastManager = newBroadcastManager(isDriver, conf, securityManager)
2.          
3.             val mapOutputTracker = if (isDriver) {
4.               new MapOutputTrackerMaster(conf,broadcastManager, isLocal)
5.             } else {
6.               new MapOutputTrackerWorker(conf)
7.             }
BroadcastManager.scala中BroadcastManager实例化的时候会调用initialize()方法,initialize()方法就创建TorrentBroadcastFactory的方式。
BroadcastManager源码如下:
1.          
2.         private[spark] classBroadcastManager(
3.             val isDriver: Boolean,
4.             conf: SparkConf,
5.             securityManager: SecurityManager)
6.           extends Logging {
7.          
8.           private var initialized = false
9.           private var broadcastFactory:BroadcastFactory = null
10.       
11.        initialize()
12.       
13.        // Called by SparkContext or Executor beforeusing Broadcast
14.        private def initialize() {
15.          synchronized {
16.            if (!initialized) {
17.              broadcastFactory = newTorrentBroadcastFactory
18.              broadcastFactory.initialize(isDriver,conf, securityManager)
19.              initialized = true
20.            }
21.          }
22.        }
 
Spark 2.0 版本中TorrentBroadcast方式:数据开始在Driver中,A节点如果使用了数据,A就成为了供应源,这个时候Driver节点、A节点二个节点成为了供应源,如第三个节点B访问的时候,第三个节点B也成为了供应源,同样的,第四个节点、第五个节点。。。。等都成为了供应源,这些都被BlockManager管理,这样不会导致一个节点压力太大,从理论上将,数据使用的节点越多,网络速度就越快。
TorrentBroadcast按照BLOCK_SIZE(默认是4MB)将Broadcast中的数据划分成为不同的Block,然后将分块信息也就是Meta信息存放到Driver的 BlockManager中,同时会告诉BlockManagerMaster说明Meta信息存放完毕。
         看一下SparkContext.scala的broadcast方法:
1.                def broadcast[T: ClassTag](value: T):Broadcast[T] = {
2.             assertNotStopped()
3.             require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
4.               "Can not directly broadcast RDDs;instead, call collect() and broadcast the result.")
5.             val bc =env.broadcastManager.newBroadcast[T](value, isLocal)
6.             val callSite = getCallSite
7.             logInfo("Created broadcast " +bc.id + " from " + callSite.shortForm)
8.             cleaner.foreach(_.registerBroadcastForCleanup(bc))
9.             bc
10.        }     
 
SparkContext.scala的broadcast方法中调用env.broadcastManager.newBroadcast,BroadcastManager.scala的newBroadcast方法如下:
1.           def newBroadcast[T: ClassTag](value_ : T,isLocal: Boolean): Broadcast[T] = {
2.             broadcastFactory.newBroadcast[T](value_,isLocal, nextBroadcastId.getAndIncrement())
3.           }   
newBroadcast方法new出来一个Broadcast,第一个参数是Value,第三个参数是BroadcastId,这里BroadcastFactory是一个trait,没有具体的实现。
1.           private[spark] trait BroadcastFactory {
2.         ......
3.         def newBroadcast[T:ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]
4.         ...
TorrentBroadcastFactory是 BroadcastFactory的具体实现:
1.         private[spark] classTorrentBroadcastFactory extends BroadcastFactory {
2.         ……   
3.         override def newBroadcast[T:ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
4.             new TorrentBroadcast[T](value_, id)
5.           }
 
BroadcastFactory的newBroadcast方法创建TorrentBroadcast实例:
1.          private[spark] class TorrentBroadcast[T:ClassTag](obj: T, id: Long)
2.           extends Broadcast[T](id) with Logging withSerializable {
3.         ……
4.         private def readBlocks():Array[ChunkedByteBuffer] = {
5.             // Fetch chunks of data. Note that allthese chunks are stored in the BlockManager and reported
6.             // to the driver, so other executors canpull these chunks from this executor as well.
7.             val blocks = newArray[ChunkedByteBuffer](numBlocks)
8.             val bm = SparkEnv.get.blockManager
9.          
10.          for (pid <- Random.shuffle(Seq.range(0,numBlocks))) {
11.            val pieceId = BroadcastBlockId(id,"piece" + pid)
12.            logDebug(s"Reading piece $pieceId of$broadcastId")
13.            // First try getLocalBytes because thereis a chance that previous attempts to fetch the
14.            // broadcast blocks have already fetchedsome of the blocks. In that case, some blocks
15.            // would be available locally (on thisexecutor).
16.            bm.getLocalBytes(pieceId) match {
17.              case Some(block) =>
18.                blocks(pid) = block
19.                releaseLock(pieceId)
20.              case None =>
21.                bm.getRemoteBytes(pieceId) match {
22.                  case Some(b) =>
23.                    if (checksumEnabled) {
24.                      val sum =calcChecksum(b.chunks(0))
25.                      if (sum != checksums(pid)) {
26.                        throw newSparkException(s"corrupt remote block $pieceId of $broadcastId:" +
27.                          s" $sum !=${checksums(pid)}")
28.                      }
29.                    }
30.                    // We found the block from remoteexecutors/driver's BlockManager, so put the block
31.                    // in this executor'sBlockManager.
32.                    if (!bm.putBytes(pieceId, b,StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
33.                      throw new SparkException(
34.                        s"Failed to store $pieceIdof $broadcastId in local BlockManager")
35.                    }
36.                    blocks(pid) = b
37.                  case None =>
38.                    throw newSparkException(s"Failed to get $pieceId of $broadcastId")
39.                }
40.            }
41.          }
42.          blocks
43.        }
 
TorrentBroadcast.scala的readBlocks方法中Random.shuffle(Seq.range(0,numBlocks)))进行随机洗牌,是因为数据有很多来源DataServer,为了保持负载均衡,因此使用shuffle。
TorrentBroadcast按照BLOCK_SIZE(默认是4MB)将Broadcast中的数据划分成为不同的Block,然后将分块信息也就是meta信息存放到Driver的 BlockManager中,StorageLevel是MEMORY_AND_DISK的方式,同时会告诉Driver中的BlockManagerMaster说明Meta信息存放完毕。数据存放到BlockManagerMaster中就变成了全局数据,BlockManagerMaster具有所有的信息,Driver、Executor就可以访问这些内容。Executor运行具体的TASK的时候,通过TorrentBroadcast的方式readBlocks,如果本地有数据就从本地读取,如果本地没有数据,就从远程读取数据。Executor读取信息以后,通过TorrentBroadcast的机制通知BlockManagerMaster数据多了一份副本,下一个Task读取数据的时候,就有2个选择,分享的节点越多,下载的供应源就越多,最终变成点到点的方式。
Broadcast可以广播RDD,join操作性能优化之一也是采用Broadcast。
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐