您的位置:首页 > 其它

SparkTC :Transitive closure on a graph(图中节点的可达性)

2014-02-27 20:50 417 查看
思路:

1.生成数据(from,to),为初试可达节点对数目(同时也是基本的节点跳转规则)

2.对数据需要做一次链接操作,(类似于一次矩阵乘)

3.将链接操作的结果提取成(from,to)形式,与当前的可达节点对做并集,得到最新的当前可达节点对数目

3.比较当前可达节点对的数量与上一轮节点对数量

4.若没有增加,则停止;否则,跳转至2继续执行

可能还是比较晕乎乎,看实验数据

图结构 (初始TC)

From To

1 2
1 3
2 3
2 4
3 1


edges(对初始图结构反转)

To From

2 1
3 1
3 2
4 2
3 1


执行Join(按照左边列)

key- (TC._2 ,edges._2)

2-(3,1)
2-(4,1)
...
3-(1,1)


最后新产生的可达节点对(与之前已经存在的会有重复,需要求并集后去重)

edges._2 TC._2

1 3
1 4
...
1 1


结合代码就比较清晰了

package myclass

import org.apache.spark.SparkContext
import SparkContext._
import java.util.Random
import scala.collection.mutable
/**
* Created by jack on 2/27/14.
*/

object MyTC {

def generateGraph = {
val lines = scala.io.Source.fromFile("src/main/resources/data/TC_data.txt").getLines().toList
val edges= mutable.HashSet.empty[(String,String)]
for(line <- lines) {
val pair = line.split("\\s+")
edges += ((pair(0),pair(1)))
}
edges.toSeq
}

def randomGenerateGraph(numEdges:Int,numVertices:Int) = {
val edges = mutable.HashSet.empty[(String,String)]
val rand = new Random(42)
while(edges.size < numEdges) {
val from = rand.nextInt(numVertices).toString
val to = rand.nextInt(numVertices).toString
if(from != to) edges += ((from,to))
}
edges.toSeq
}

def main(args: Array[String]) {
val spark = new SparkContext("local","my TC",
System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
//		var tc = spark.parallelize(generateGraph).cache()
//随机产生图
var tc = spark.parallelize(randomGenerateGraph(20,10)).cache()

val edges = tc.map(x => (x._2,x._1))
var oldCount = tc.count()
var newCount = -1L
while(oldCount != newCount) {
oldCount = newCount
//执行链接操作,相当于步长加一
tc = tc.union(tc.join(edges).map(x => (x._2._2,x._2._1))).distinct().cache()
newCount = tc.count()
}
println("TC has "+tc.count()+" edges.")
System.exit(0)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: