您的位置:首页 > 其它

Spark的standalone源码分析(五)

2013-02-09 17:49 495 查看
承接上文,本节继续介绍broadcast机制中的BitTorrentBroadcast;

3. BitTorrentBroadcast

BitTorrentBroadcast即采用BitTorrent的方式来广播变量;

大致流程如下:
1. 首先,work节点通过GuideMultipleRequests服务向master节点获得selectedSources;master节点通过random select的方式从当前的listOfSources选出sources返回;当然,刚开始的时候master的listOfSources只有master节点,随着work节点的connect,listOfSources会不断增加;因此,BitTorrentBroadcast的work节点会间歇的与master节点通信以update master节点的listOfSources到本地;
class TalkToGuide(gInfo: SourceInfo)
extends Thread with Logging {
override def run() {

// Keep exchaning information until all blocks have been received
while (hasBlocks.get < totalBlocks) {
talkOnce
Thread.sleep(MultiTracker.ranGen.nextInt(
MultiTracker.MaxKnockInterval - MultiTracker.MinKnockInterval) +
MultiTracker.MinKnockInterval)
}

// Talk one more time to let the Guide know of reception completion
talkOnce
}

2. 然后,将selectedSources加入该work节点的listOfSources中,在pickPeerToTalkToRandom方法中,选择一个SourceInfo;pickPeerToTalkToRandom首先将已经connect的节点从候选节点中去除,从剩下的节点中选择拥有当前节点没有的blocks最多的那个节点;
// Select the peer that has the most blocks that this receiver does not
peersNotInUse.foreach { eachSource =>
var tempHasBlocksBitVector: BitSet = null
hasBlocksBitVector.synchronized {
tempHasBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet]
}
tempHasBlocksBitVector.flip(0, tempHasBlocksBitVector.size)
tempHasBlocksBitVector.and(eachSource.hasBlocksBitVector)

if (tempHasBlocksBitVector.cardinality > curMax) {
curPeer = eachSource
curMax = tempHasBlocksBitVector.cardinality
}
}

3. 最后,根据2选择出来的SourceInfo,通过TalkToPeer,获取blocks;TalkToPeer会开启一个threadpool,向newPeerToTalkTo节点请求blocks数据,为防止重复的download block数据,会维护一个blocksInRequestBitVector位图,让其他的thread知道该block已经被同步;
// Let other threads know that blockToAskFor is being requested
blocksInRequestBitVector.synchronized {
blocksInRequestBitVector.set(blockToAskFor)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: