Spark的standalone源码分析(五)
2013-02-09 17:49
495 查看
承接上文,本节继续介绍broadcast机制中的BitTorrentBroadcast;
大致流程如下:
1. 首先,work节点通过GuideMultipleRequests服务向master节点获得selectedSources;master节点通过random select的方式从当前的listOfSources选出sources返回;当然,刚开始的时候master的listOfSources只有master节点,随着work节点的connect,listOfSources会不断增加;因此,BitTorrentBroadcast的work节点会间歇的与master节点通信以update master节点的listOfSources到本地;
2. 然后,将selectedSources加入该work节点的listOfSources中,在pickPeerToTalkToRandom方法中,选择一个SourceInfo;pickPeerToTalkToRandom首先将已经connect的节点从候选节点中去除,从剩下的节点中选择拥有当前节点没有的blocks最多的那个节点;
3. 最后,根据2选择出来的SourceInfo,通过TalkToPeer,获取blocks;TalkToPeer会开启一个threadpool,向newPeerToTalkTo节点请求blocks数据,为防止重复的download block数据,会维护一个blocksInRequestBitVector位图,让其他的thread知道该block已经被同步;
3. BitTorrentBroadcast
BitTorrentBroadcast即采用BitTorrent的方式来广播变量;大致流程如下:
1. 首先,work节点通过GuideMultipleRequests服务向master节点获得selectedSources;master节点通过random select的方式从当前的listOfSources选出sources返回;当然,刚开始的时候master的listOfSources只有master节点,随着work节点的connect,listOfSources会不断增加;因此,BitTorrentBroadcast的work节点会间歇的与master节点通信以update master节点的listOfSources到本地;
class TalkToGuide(gInfo: SourceInfo) extends Thread with Logging { override def run() { // Keep exchaning information until all blocks have been received while (hasBlocks.get < totalBlocks) { talkOnce Thread.sleep(MultiTracker.ranGen.nextInt( MultiTracker.MaxKnockInterval - MultiTracker.MinKnockInterval) + MultiTracker.MinKnockInterval) } // Talk one more time to let the Guide know of reception completion talkOnce }
2. 然后,将selectedSources加入该work节点的listOfSources中,在pickPeerToTalkToRandom方法中,选择一个SourceInfo;pickPeerToTalkToRandom首先将已经connect的节点从候选节点中去除,从剩下的节点中选择拥有当前节点没有的blocks最多的那个节点;
// Select the peer that has the most blocks that this receiver does not peersNotInUse.foreach { eachSource => var tempHasBlocksBitVector: BitSet = null hasBlocksBitVector.synchronized { tempHasBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet] } tempHasBlocksBitVector.flip(0, tempHasBlocksBitVector.size) tempHasBlocksBitVector.and(eachSource.hasBlocksBitVector) if (tempHasBlocksBitVector.cardinality > curMax) { curPeer = eachSource curMax = tempHasBlocksBitVector.cardinality } }
3. 最后,根据2选择出来的SourceInfo,通过TalkToPeer,获取blocks;TalkToPeer会开启一个threadpool,向newPeerToTalkTo节点请求blocks数据,为防止重复的download block数据,会维护一个blocksInRequestBitVector位图,让其他的thread知道该block已经被同步;
// Let other threads know that blockToAskFor is being requested blocksInRequestBitVector.synchronized { blocksInRequestBitVector.set(blockToAskFor) }
相关文章推荐
- Spark的standalone源码分析(一)
- 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- 源码-spark Standalone部署模式及其容错性分析
- Spark的standalone源码分析(二)
- Apache Spark源码走读之7 -- Standalone部署方式分析
- Spark的standalone源码分析(四)
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
- Apache Spark源码走读之7 -- Standalone部署方式分析
- Standalone模式下Spark 中通信机制的源码分析
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
- 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
- Spark的standalone源码分析(三)
- Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析
- SparkStreaming的WordCount示例及源码分析(一)
- Spark1.3从创建到提交:10)任务提交源码分析
- Spark源码分析之三:Stage划分
- Spark 源码分析 -- BlockStore