Spark GraphX学习(一)Connected Components算法
2017-08-18 19:56
3635 查看
现实生活中存在各种各样的网络,诸如人际关系网、交易网、运输网等等。对这些网络进行社区发现具有极大的意义,如在人际关系网中,可以发现出具有不同兴趣、背景的社会团体,方便进行不同的宣传策略。本文采用Spark GraphX三大算法之一的Connected Components算法实现社交网络中的社区发现。
1.Spark GraphX基础知识
1.1Graph
Spark中属性图是由VertexRDD和EdgeRDD两个参数构成的。其中,每个vertex由一个唯一的64位长的标识符(VertexId)作为key。同时,属性图也和RDD一样,是不可变的、分布式的、可容错的。属性图Graph的定义如下:
即Graph中包含了三个属性顶点集、边集、triplets集(三元组)。可以用图形象的表示为:
· Vertices:由VertexId(Long类型)、attribute(属性描述或距离)构成。如,(3L,
("San Francisco", "CA")),(1L, 10)
· Edges:由srcId(起始节点VertexId)、dstId(终止节点VertexId)、attribute(边的权值)构成。如,Edge(1L,
2L, 20)
· Triplets:由srcId、srcAttr(起始Vertex)和dstId、dstAttr(终止Vertex),以及attr构成。如,((1,
(Santa Clara, CA)), (2, (Fremont, CA)), 20)
1.2GraphLoader
GraphLoader.edgeListFile:Graph[Int,Int]) 提供了一个方式从磁盘上的边列表中加载一个图。它解析如下形式(源顶点ID,目标顶点ID)的连接表:
它从指定的边创建一个图,自动地创建边提及的所有顶点。顶点和边的属性默认都是1
2.Connected Components算法
Connected Components即连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id。如果在图G中,任意2个顶点之间都存在路径,那么称G为连通图,否则称该图为非连通图,则其中的极大连通子图称为连通体,如下图所示,该图中有两个连通体:
3.Spark GraphX实现社区发现
介绍完以上的基本概念,我们可以使用Spark GraphX实现社区网络的社区发现。
有两个输入文件:
followers.txt (起点id,终点id)
users.txt (id,first name,full name)
最终计算得到这个关系网络有两个社区。
1.Spark GraphX基础知识
1.1Graph
Spark中属性图是由VertexRDD和EdgeRDD两个参数构成的。其中,每个vertex由一个唯一的64位长的标识符(VertexId)作为key。同时,属性图也和RDD一样,是不可变的、分布式的、可容错的。属性图Graph的定义如下:
abstract class Graph[VD, ED]{ val vertices: VertexRDD[VD] val edges: EdgeRDD[ED val triplets: RDD[EdgeTriplet[VD, ED]]] }
即Graph中包含了三个属性顶点集、边集、triplets集(三元组)。可以用图形象的表示为:
· Vertices:由VertexId(Long类型)、attribute(属性描述或距离)构成。如,(3L,
("San Francisco", "CA")),(1L, 10)
· Edges:由srcId(起始节点VertexId)、dstId(终止节点VertexId)、attribute(边的权值)构成。如,Edge(1L,
2L, 20)
· Triplets:由srcId、srcAttr(起始Vertex)和dstId、dstAttr(终止Vertex),以及attr构成。如,((1,
(Santa Clara, CA)), (2, (Fremont, CA)), 20)
1.2GraphLoader
GraphLoader.edgeListFile:Graph[Int,Int]) 提供了一个方式从磁盘上的边列表中加载一个图。它解析如下形式(源顶点ID,目标顶点ID)的连接表:
2 1 4 1 1 2
它从指定的边创建一个图,自动地创建边提及的所有顶点。顶点和边的属性默认都是1
2.Connected Components算法
Connected Components即连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id。如果在图G中,任意2个顶点之间都存在路径,那么称G为连通图,否则称该图为非连通图,则其中的极大连通子图称为连通体,如下图所示,该图中有两个连通体:
3.Spark GraphX实现社区发现
介绍完以上的基本概念,我们可以使用Spark GraphX实现社区网络的社区发现。
有两个输入文件:
followers.txt (起点id,终点id)
4 1 1 2 6 3 7 3 7 6 6 7 3 7
users.txt (id,first name,full name)
1,BarackObama,Barack Obama 2,ladygaga,Goddess of Love 3,jeresig,John Resig 4,justinbieber,Justin Bieber 6,matei_zaharia,Matei Zaharia 7,odersky,Martin Odersky 8,anonsys代码实现:
import org.apache.spark.graphx.{Graph, GraphLoader, VertexId, VertexRDD} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GraphTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取followers.txt文件创建图 val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc,"F:\\followers.txt") //计算连通体 val components: Graph[VertexId, Int] = graph.connectedComponents() val vertices: VertexRDD[VertexId] = c 4000 omponents.vertices /** * vertices: * (4,1) * (1,1) * (6,3) * (3,3) * (7,3) * (2,1) * 是一个tuple类型,key分别为所有的顶点id,value为key所在的连通体id(连通体中顶点id最小值) */ //读取users.txt文件转化为(key,value)形式 val users: RDD[(VertexId, String)] = sc.textFile("F:\\users.txt").map(line => { val fields: Array[String] = line.split(",") (fields(0).toLong, fields(1)) }) /** * users: * (1,BarackObama) * (2,ladygaga) * (3,jeresig) * (4,justinbieber) * (6,matei_zaharia) * (7,odersky) * (8,anonsys) */ users.join(vertices).map{ case(id,(username,vertices))=>(vertices,username) }.groupByKey().map(t=>{ t._1+"->"+t._2.mkString(",") }).foreach(println(_)) /** * 得到结果为: * 1->justinbieber,BarackObama,ladygaga * 3->matei_zaharia,jeresig,odersky */ } }
最终计算得到这个关系网络有两个社区。
相关文章推荐
- 添加第三方库到Maven资源库
- 添加Maven(mvn)、sbt的国内仓库
- Scala知识图谱
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Windows7下安装Scala 2.9.2教程
- 浅谈Scala的Class、Object和Apply()方法
- Redis 中spark参数executor-cores引起的异常解决办法
- Spark SQL数据加载和保存实例讲解
- play for scala 实现SessionFilter 过滤未登录用户跳转到登录页面
- java 中Spark中将对象序列化存储到hdfs
- Scala小程序详解及实例代码
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- Scala代码实现列出Hadoop 文件夹下面的所有文件
- ClassNotFoundException:scala.PreDef$