SparkTC :Transitive closure on a graph(图中节点的可达性)
2014-02-27 20:50
417 查看
思路:
1.生成数据(from,to),为初试可达节点对数目(同时也是基本的节点跳转规则)
2.对数据需要做一次链接操作,(类似于一次矩阵乘)
3.将链接操作的结果提取成(from,to)形式,与当前的可达节点对做并集,得到最新的当前可达节点对数目
3.比较当前可达节点对的数量与上一轮节点对数量
4.若没有增加,则停止;否则,跳转至2继续执行
可能还是比较晕乎乎,看实验数据
图结构 (初始TC)
From To
edges(对初始图结构反转)
To From
执行Join(按照左边列)
key- (TC._2 ,edges._2)
最后新产生的可达节点对(与之前已经存在的会有重复,需要求并集后去重)
edges._2 TC._2
结合代码就比较清晰了
1.生成数据(from,to),为初试可达节点对数目(同时也是基本的节点跳转规则)
2.对数据需要做一次链接操作,(类似于一次矩阵乘)
3.将链接操作的结果提取成(from,to)形式,与当前的可达节点对做并集,得到最新的当前可达节点对数目
3.比较当前可达节点对的数量与上一轮节点对数量
4.若没有增加,则停止;否则,跳转至2继续执行
可能还是比较晕乎乎,看实验数据
图结构 (初始TC)
From To
1 2 1 3 2 3 2 4 3 1
edges(对初始图结构反转)
To From
2 1 3 1 3 2 4 2 3 1
执行Join(按照左边列)
key- (TC._2 ,edges._2)
2-(3,1) 2-(4,1) ... 3-(1,1)
最后新产生的可达节点对(与之前已经存在的会有重复,需要求并集后去重)
edges._2 TC._2
1 3 1 4 ... 1 1
结合代码就比较清晰了
package myclass import org.apache.spark.SparkContext import SparkContext._ import java.util.Random import scala.collection.mutable /** * Created by jack on 2/27/14. */ object MyTC { def generateGraph = { val lines = scala.io.Source.fromFile("src/main/resources/data/TC_data.txt").getLines().toList val edges= mutable.HashSet.empty[(String,String)] for(line <- lines) { val pair = line.split("\\s+") edges += ((pair(0),pair(1))) } edges.toSeq } def randomGenerateGraph(numEdges:Int,numVertices:Int) = { val edges = mutable.HashSet.empty[(String,String)] val rand = new Random(42) while(edges.size < numEdges) { val from = rand.nextInt(numVertices).toString val to = rand.nextInt(numVertices).toString if(from != to) edges += ((from,to)) } edges.toSeq } def main(args: Array[String]) { val spark = new SparkContext("local","my TC", System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) // var tc = spark.parallelize(generateGraph).cache() //随机产生图 var tc = spark.parallelize(randomGenerateGraph(20,10)).cache() val edges = tc.map(x => (x._2,x._1)) var oldCount = tc.count() var newCount = -1L while(oldCount != newCount) { oldCount = newCount //执行链接操作,相当于步长加一 tc = tc.union(tc.join(edges).map(x => (x._2._2,x._2._1))).distinct().cache() newCount = tc.count() } println("TC has "+tc.count()+" edges.") System.exit(0) } }
相关文章推荐
- SparkGraphx计算指定节点的N度关系节点源码
- Running Apache Spark GraphX algorithms on Library of Congress subject heading SKOS
- SparkGraphx计算指定节点的N度关系节点
- Running Spark GraphX algorithms on Library of Congress subject heading SKOS
- CaffeOnSpark安装和使用教程系列二:单节点使用CaffeOnSpark进行MNIST数据集的测试
- Spark GraphX
- Spark1.0.0 on Standalone 运行架构实例解析
- Spark_2.0 on hadoop_2.7.2
- Spark on Yarn解密及运行流程
- Spark1.6.1单节点环境搭建
- Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系
- SparkR(R on Spark)编程指南 含 dataframe操作
- docker on spark
- 《Efficient Batch Processing for Multiple Keyword Queries on Graph Data》——论文笔记
- Spark2.0.1 on yarn with hue 集群搭建部署(一)基础环境配置
- hdu 4677 Query on Graph
- Spark On Yarn(HDFS HA)详细配置
- spark集群其中一个节点没有启动成功
- spark-shell on yarn 出错(arn application already ended,might be killed or not able to launch applic)解决
- Spark on Yarn可能遇到的问题