您的位置:首页 > 其它

spark graphx 实现图的一度正反向搜索,二度正反向搜索

2018-01-20 15:51 489 查看
    通讯录数据,一直是各个app平台的重要数据来源。通讯录组成的图谱关系分析是大数据部门必做的一件事,因为他可以为推荐算法工程师提供更好的数据支持。

    图数据存储有很多种方式:

    基本的一种方式就是存储到关系型数据仓储中。这种存储方式的优点是,数据仓储模型简单易于实现且对于一度关系查询也很方便。但是对于二度关系则需要自join一次,三度好友分析则需要自join两次,对于多度分析十分消耗性能。

    另外一种是基于开源框架neo4j等图数据实现图数据的存储,不进能够实现多度快速查询,还能对图进行渲染。但是neo4j社区版本仅能够支持千万级别的点,商业版本收费太高

    对于数据量很大,但是又不想使用付费版本的图数据,但又想要实现图数据的快速索引。现提供一种方案,sparkGraphx+hbase+hive实现的图数据预计算平台。原始数据存储增量存储在hive中,批次使用graphx将hive中图数据做一度,二度...n度预处理计算,结果存储到hbase/solr中已达到实时查询图数据的目的。

    以下是基于sparkGraphx实现图数据的一度全向索引,二度(正正,反反,正反,反正)索引的计算方案。

【注】:
searchByPatch方法是路径索引,eg:传入1就是一度正向,-1,1二度正向反向索引,-1,1,1三度路径的反向正向正向索引

     数据格式:

 1 2

2 1

3 1

 第一行表示用户1存储了用户2的联系方式

package mob

import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object OperatorFun {
def transforEdge(edgeString: String): List[Edge[String]] = {
val edges = edgeString.trim.replaceAll(" +", " ")
val vertexs = edges.toString.split(" ")
if (vertexs.length == 2 && vertexs(0) != vertexs(1)) {
return List(Edge(vertexs(0).toLong, vertexs(1).toLong, ""));
}
return List[Edge[String]]()
}
}

object FriendSearch {
/**
* 构建图
*
* @param edgeFilePath
* @return
*/
def loadGraph(edgeFilePath: String): Graph[String, String] = {
val conf = new SparkConf().setAppName("graph").setMaster("local")
val sc = new SparkContext(conf)
val edgeRDD = sc.textFile(edgeFilePath).flatMap(OperatorFun.transforEdge(_)).distinct()
val graph: Graph[Long, String] = Graph.fromEdges(edgeRDD, 0, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK)
return graph.mapVertices((id, att) => (att.toString))
}

/**
* 正向搜索图
*
* @param graph
* @return
*/
def reverseSearch(graph: Graph[String, String], index: Int): Graph[String, String] = {
val vertexRdd = graph.aggregateMessages[String](
truplet => {
if (index == 0) {
truplet.sendToDst(truplet.srcId.toString)
} else {
val srcId = truplet.srcId
if (truplet.srcAttr != null) {
val path = truplet.srcAttr.split("\\|").map(node => s"$srcId->${node}").addString(new StringBuilder, "|")
truplet.sendToDst(path.toString)
}
}
},
(a, b) => {
if (a == null && b != null) b else if (a != null && b == null) a else if (a == null && b == null) "" else s"${a}|${b}"
}
)
return Graph(vertexRdd, graph.edges)
}

/**
* 反向搜索图
*
* @param graph
* @return
*/
def directSearch(graph: Graph[String, String], index: Int): Graph[String, String] = {
val vertexRdd = graph.aggregateMessages[String](
truplet => {
if (index == 0) {
truplet.sendToSrc(truplet.dstId.toString)
} else {
val dstId = truplet.dstId
if (truplet.dstAttr != null) {
val path = truplet.dstAttr.split("\\|").map(node => s"$dstId->${node}").addString(new StringBuilder, "|")
truplet.sendToSrc(path.toString)
}
}
},
(a, b) => {
if (a == null && b != null) b else if (a != null && b == null) a else if (a == null && b == null) "" else s"${a}|${b}"
}
)
return Graph(vertexRdd, graph.edges)
}

/**
* 一度正反向搜索
*
* @param graph
* @return Graph(id,(正,反))
*/
def oneDegreeAll(graph: Graph[String, String]): RDD[(VertexId, (String, String))] = {
val directSearchG = directSearch(graph, 0).vertices
val reverseSearchG = reverseSearch(graph, 0).vertices
return directSearchG.join(reverseSearchG)
}

/**
* 二度正反向搜索
*
* @param graph
* @return Graph(id,(正正,反反,正反,反正))
*/
def twoDegreeAll(graph: Graph[String, String]): RDD[(VertexId, (String, String, String, String))] = {
/**
* 注释掉的实现方式会重复计算第一阶段,增加计算时长,后期维护尽量不要使用这种方式
* var dd = searchByPatch("1,1",graph).vertices.mapValues(att => (att,"dd"))
* var rr = searchByPatch("-1,-1",graph).vertices.mapValues(att => (att,"rr"))
* var dr = searchByPatch("1,-1",graph).vertices.mapValues(att => (att,"dr"))
* var rd = searchByPatch("-1,1",graph).vertices.mapValues(att =>(att,"rd"))
* dd.union(rr).union(dr).union(rd).aggregateByKey(List[(String,String)]())((list,attr)=>list:+attr,(a,b)=>a:::b ).mapValues(list =>{
* val map = list.map(tt=>(tt._2,tt._1)).toMap
* (map("dd"),map("rr"),map("dr"),map("rd"))
* })
*/
graph.vertices.cache()
val d = directSearch(graph, 0)
d.vertices.cache()
val r = reverseSearch(graph, 0)
r.vertices.cache()
val dd = directSearch(d, 1).vertices.mapValues(att => (att, "dd"))
val rr = reverseSearch(r, 1).vertices.mapValues(att => (att, "rr"))
val dr = directSearch(r, 1).vertices.mapValues(att => (att, "dr"))
val rd = reverseSearch(d, 1).vertices.mapValues(att => (att, "rd"))
return dd.union(rr).union(dr).union(rd).aggregateByKey(List[(String, String)]())((list, attr) => list :+ attr, (a, b) => a ::: b).mapValues(list => {
val map = list.map(tt => (tt._2, tt._1)).toMap
(map("dd"), map("rr"), map("dr"), map("rd"))
})
}

/**
* 索搜图调度
*
* @param direct 1:正向搜索,2反向搜索
* @param index  0/非0
* @param graph
* @return
*/
def searchScheduler(direct: String, index: Int, graph: Graph[String, String]): Graph[String, String] = {
if ("1".equals(direct)) {
return directSearch(graph, index)
} else if ("-1".equals(direct)) {
return reverseSearch(graph, index)
} else {
throw new Exception(s"索引方向 ${direct} 不正确,只接受1/-1")
}
}

/**
* 路径搜索图
*
* @param path 搜索路径,eg:"1,-1"
* @param graph
* @return
*/
def searchByPatch(path: String, graph: Graph[String, String]): Graph[String, String] = {
/*判定搜索路径正确性*/
if (!path.matches("-?1[,-?1]*")) {
throw new Exception("索引路径不正确,eg:'-1,1-1'")
}
var graphTemp: Graph[String, String] = graph
/*路径索搜*/
val pathArr = path.split(",")
for (index <- 0 until pathArr.length) {
graphTemp = searchScheduler(pathArr(index), index, graphTemp)
}
return graphTemp
}

def main(args: Array[String]): Unit = {
val edgeFilePath = "D:\\graph.txt"
val graph: Graph[String, String] = loadGraph(edgeFilePath)
//    oneDegreeAll(graph)
twoDegreeAll(graph).foreach(println(_))

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: