spark graphx 实现图的一度正反向搜索,二度正反向搜索
2018-01-20 15:51
489 查看
通讯录数据,一直是各个app平台的重要数据来源。通讯录组成的图谱关系分析是大数据部门必做的一件事,因为他可以为推荐算法工程师提供更好的数据支持。
图数据存储有很多种方式:
基本的一种方式就是存储到关系型数据仓储中。这种存储方式的优点是,数据仓储模型简单易于实现且对于一度关系查询也很方便。但是对于二度关系则需要自join一次,三度好友分析则需要自join两次,对于多度分析十分消耗性能。
另外一种是基于开源框架neo4j等图数据实现图数据的存储,不进能够实现多度快速查询,还能对图进行渲染。但是neo4j社区版本仅能够支持千万级别的点,商业版本收费太高
对于数据量很大,但是又不想使用付费版本的图数据,但又想要实现图数据的快速索引。现提供一种方案,sparkGraphx+hbase+hive实现的图数据预计算平台。原始数据存储增量存储在hive中,批次使用graphx将hive中图数据做一度,二度...n度预处理计算,结果存储到hbase/solr中已达到实时查询图数据的目的。
以下是基于sparkGraphx实现图数据的一度全向索引,二度(正正,反反,正反,反正)索引的计算方案。
【注】:
数据格式:
1 2
2 1
3 1
第一行表示用户1存储了用户2的联系方式
图数据存储有很多种方式:
基本的一种方式就是存储到关系型数据仓储中。这种存储方式的优点是,数据仓储模型简单易于实现且对于一度关系查询也很方便。但是对于二度关系则需要自join一次,三度好友分析则需要自join两次,对于多度分析十分消耗性能。
另外一种是基于开源框架neo4j等图数据实现图数据的存储,不进能够实现多度快速查询,还能对图进行渲染。但是neo4j社区版本仅能够支持千万级别的点,商业版本收费太高
对于数据量很大,但是又不想使用付费版本的图数据,但又想要实现图数据的快速索引。现提供一种方案,sparkGraphx+hbase+hive实现的图数据预计算平台。原始数据存储增量存储在hive中,批次使用graphx将hive中图数据做一度,二度...n度预处理计算,结果存储到hbase/solr中已达到实时查询图数据的目的。
以下是基于sparkGraphx实现图数据的一度全向索引,二度(正正,反反,正反,反正)索引的计算方案。
【注】:
searchByPatch方法是路径索引,eg:传入1就是一度正向,-1,1二度正向反向索引,-1,1,1三度路径的反向正向正向索引
数据格式:
1 2
2 1
3 1
第一行表示用户1存储了用户2的联系方式
package mob import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} object OperatorFun { def transforEdge(edgeString: String): List[Edge[String]] = { val edges = edgeString.trim.replaceAll(" +", " ") val vertexs = edges.toString.split(" ") if (vertexs.length == 2 && vertexs(0) != vertexs(1)) { return List(Edge(vertexs(0).toLong, vertexs(1).toLong, "")); } return List[Edge[String]]() } } object FriendSearch { /** * 构建图 * * @param edgeFilePath * @return */ def loadGraph(edgeFilePath: String): Graph[String, String] = { val conf = new SparkConf().setAppName("graph").setMaster("local") val sc = new SparkContext(conf) val edgeRDD = sc.textFile(edgeFilePath).flatMap(OperatorFun.transforEdge(_)).distinct() val graph: Graph[Long, String] = Graph.fromEdges(edgeRDD, 0, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK) return graph.mapVertices((id, att) => (att.toString)) } /** * 正向搜索图 * * @param graph * @return */ def reverseSearch(graph: Graph[String, String], index: Int): Graph[String, String] = { val vertexRdd = graph.aggregateMessages[String]( truplet => { if (index == 0) { truplet.sendToDst(truplet.srcId.toString) } else { val srcId = truplet.srcId if (truplet.srcAttr != null) { val path = truplet.srcAttr.split("\\|").map(node => s"$srcId->${node}").addString(new StringBuilder, "|") truplet.sendToDst(path.toString) } } }, (a, b) => { if (a == null && b != null) b else if (a != null && b == null) a else if (a == null && b == null) "" else s"${a}|${b}" } ) return Graph(vertexRdd, graph.edges) } /** * 反向搜索图 * * @param graph * @return */ def directSearch(graph: Graph[String, String], index: Int): Graph[String, String] = { val vertexRdd = graph.aggregateMessages[String]( truplet => { if (index == 0) { truplet.sendToSrc(truplet.dstId.toString) } else { val dstId = truplet.dstId if (truplet.dstAttr != null) { val path = truplet.dstAttr.split("\\|").map(node => s"$dstId->${node}").addString(new StringBuilder, "|") truplet.sendToSrc(path.toString) } } }, (a, b) => { if (a == null && b != null) b else if (a != null && b == null) a else if (a == null && b == null) "" else s"${a}|${b}" } ) return Graph(vertexRdd, graph.edges) } /** * 一度正反向搜索 * * @param graph * @return Graph(id,(正,反)) */ def oneDegreeAll(graph: Graph[String, String]): RDD[(VertexId, (String, String))] = { val directSearchG = directSearch(graph, 0).vertices val reverseSearchG = reverseSearch(graph, 0).vertices return directSearchG.join(reverseSearchG) } /** * 二度正反向搜索 * * @param graph * @return Graph(id,(正正,反反,正反,反正)) */ def twoDegreeAll(graph: Graph[String, String]): RDD[(VertexId, (String, String, String, String))] = { /** * 注释掉的实现方式会重复计算第一阶段,增加计算时长,后期维护尽量不要使用这种方式 * var dd = searchByPatch("1,1",graph).vertices.mapValues(att => (att,"dd")) * var rr = searchByPatch("-1,-1",graph).vertices.mapValues(att => (att,"rr")) * var dr = searchByPatch("1,-1",graph).vertices.mapValues(att => (att,"dr")) * var rd = searchByPatch("-1,1",graph).vertices.mapValues(att =>(att,"rd")) * dd.union(rr).union(dr).union(rd).aggregateByKey(List[(String,String)]())((list,attr)=>list:+attr,(a,b)=>a:::b ).mapValues(list =>{ * val map = list.map(tt=>(tt._2,tt._1)).toMap * (map("dd"),map("rr"),map("dr"),map("rd")) * }) */ graph.vertices.cache() val d = directSearch(graph, 0) d.vertices.cache() val r = reverseSearch(graph, 0) r.vertices.cache() val dd = directSearch(d, 1).vertices.mapValues(att => (att, "dd")) val rr = reverseSearch(r, 1).vertices.mapValues(att => (att, "rr")) val dr = directSearch(r, 1).vertices.mapValues(att => (att, "dr")) val rd = reverseSearch(d, 1).vertices.mapValues(att => (att, "rd")) return dd.union(rr).union(dr).union(rd).aggregateByKey(List[(String, String)]())((list, attr) => list :+ attr, (a, b) => a ::: b).mapValues(list => { val map = list.map(tt => (tt._2, tt._1)).toMap (map("dd"), map("rr"), map("dr"), map("rd")) }) } /** * 索搜图调度 * * @param direct 1:正向搜索,2反向搜索 * @param index 0/非0 * @param graph * @return */ def searchScheduler(direct: String, index: Int, graph: Graph[String, String]): Graph[String, String] = { if ("1".equals(direct)) { return directSearch(graph, index) } else if ("-1".equals(direct)) { return reverseSearch(graph, index) } else { throw new Exception(s"索引方向 ${direct} 不正确,只接受1/-1") } } /** * 路径搜索图 * * @param path 搜索路径,eg:"1,-1" * @param graph * @return */ def searchByPatch(path: String, graph: Graph[String, String]): Graph[String, String] = { /*判定搜索路径正确性*/ if (!path.matches("-?1[,-?1]*")) { throw new Exception("索引路径不正确,eg:'-1,1-1'") } var graphTemp: Graph[String, String] = graph /*路径索搜*/ val pathArr = path.split(",") for (index <- 0 until pathArr.length) { graphTemp = searchScheduler(pathArr(index), index, graphTemp) } return graphTemp } def main(args: Array[String]): Unit = { val edgeFilePath = "D:\\graph.txt" val graph: Graph[String, String] = loadGraph(edgeFilePath) // oneDegreeAll(graph) twoDegreeAll(graph).foreach(println(_)) } }
相关文章推荐
- spark graphx实现共同好友的聚合
- Spark生态之Spark Graphx介绍、实现分析和实例
- Spark GraphX实现Bron–Kerbosch算法-极大团问题
- 第95讲:使用Scala开发集群运行的Spark来实现在线热搜索词获取
- WinEdt 6+SumatraPDF--实现TeX和PDF文件正反向搜索!
- 正则表达式中的反向预搜索实现
- SparkGraphX加权最短路径算法实现
- spark(breeze)L-BFGS使用的线搜索实现
- Spark GraphX实现PageRank
- Spark Graphx 实现图中极大团挖掘, 伪并行化算法
- WinEdt6+SumatraPDF--实现TeX和PDF文件正反向搜索!(转载)
- 正则表达式中的反向预搜索实现
- WinEdt 6+SumatraPDF -- 实现TeX文档和PDF文件正反向搜索
- nginx反向代理tomcat集群实现动静分离
- 在Spark中通过Scala + Mongodb实现连接池
- 三种方法实现Spark计算WordCount
- Yii框架结合sphinx,Ajax实现搜索分页功能示例
- 三种方法实现Spark计算WordCount
- 详细探究Spark的shuffle实现
- 利用chosen.js插件实现下拉可搜索多选,控制选择个数,选项框复位,修改下拉选项的功能(附代码)