您的位置:首页 > 其它

spark graphx 图计算demo,结果展现

2017-01-04 17:27 323 查看
spark graphx 图计算官网实例练习:
http://spark.apache.org/docs/latest/graphx-programming-guide.html
import org.apache.spark._

import org.apache.spark.graphx._

// To make some of the examples work we will also need RDD

import org.apache.spark.rdd.RDD

import org.apache.spark.graphx.GraphLoader

val graph = GraphLoader.edgeListFile(sc, "/data/graphx/followers.txt")

val temp = graph.mapVertices((id,attr) => attr.toInt  * 2)

temp.vertices.take(10)

或者

val temp : Graph[Int,Int] =  graph.mapVertices((_,attr) => attr  * 2)

temp.vertices.take(10)

//140M的数据:

val graph = GraphLoader.edgeListFile(sc, "hdfs://server1:9000/data/graphx/followers.txt",numEdgePartitions=4)

graph.vertices.count

graph.edges.count

------properties  operators---end====================================

structural operators-----start

把所有的边的方向对调一下!!!!reverse

subgraph-----对边或者顶点帅选

val subGraph = graph.subgraph(epred = e =>e.srcId > e.dstId)

val subGraph = graph.subgraph(epred = e =>e.srcId > e.dstId,vpred=(id,_) => id>500000)

mask---合并一个子集

groupEdges----多个边合并

structural operators-----end=============

computing degree-----start=============

val tmp = graph.degrees

val temp = graph.inDegrees//(作为目标节点的数量!)

temp.take(10)

val temp = graph.outDegrees//(作为原节点的数量!)

temp.take(10)

val tmp = graph.degrees

def max(a : (VertexId,Int),b: (VertexId,Int) ): (VertexId,Int) = if (a._2 > b._2)  a else b

def max(a : {VertexId,Int},b: {VertexId,Int)}): (VertexId,Int) = if (a._2 > b._2)  a else b

graph.degrees.reduce(max)

//业务含义:哪个完整的导航页数最多: 500个,说明可能是123的导航网站

computing degree-----end========================

collectint Neighbors-------start=============

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {

collectint Neighbors-------end=============

join operators-------start=============

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)

    : Graph[VD, ED] = {

  def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])

      (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)

    : Graph[VD2, ED]

val rawGraph = graph.mapVertices((id,attr) => 0)//顶点设置为0

val outDeg = rawGraph.outDegrees

val tmp = rawGraph.joinVertices[Int](outDeg){(_,_,optDeg) => optDeg}

val tmp = rawGraph.outerJoinVertices[int,Int](outDeg){_,_,optDeg => optDeg.getOrElse(0)}

join operators-------end=============

地图----挖掘工具!

map reduce triplets-------start=============

图上做mr操作
http://spark.apache.org/docs/latest/graphx-programming-guide.html
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.

// Here we use a random graph for simplicity.

val graph: Graph[Double, Int] =

  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

// Compute the number of older followers and their total age

val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](

  triplet => { // Map Function

    if (triplet.srcAttr > triplet.dstAttr) {

      // Send message to destination vertex containing counter and age

      triplet.sendToDst(1, triplet.srcAttr)

    }

  },

  // Add counter and age

  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function

)

// Divide total age by number of older followers to get average age of older followers

val avgAgeOfOlderFollowers: VertexRDD[Double] =

  olderFollowers.mapValues( (id, value) =>

    value match { case (count, totalAge) => totalAge / count } )

// Display the results

avgAgeOfOlderFollowers.collect.foreach(println(_))

map reduce triplets-------end=============

pregel api-------start=============

为何graphx需要提供调用pregel api操作?

为了让大家更方便处理一些迭代操作!graphx需手动cache---控制比较难:点边分别cache;

会自动的处理cache!!!!

  def pregel[A: ClassTag](

      initialMsg: A,

      maxIterations: Int = Int.MaxValue,

      activeDirection: EdgeDirection = EdgeDirection.Either)(

      vprog: (VertexId, VD, A) => VD,

      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

      mergeMsg: (A, A) => A)

    : Graph[VD, ED] = {

    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)

  }

  采用一步处理方式,适用于迭代多的场景

pregel api--------end=============

graphX设计--------start=============

edge cut 为了分布式

vertex cut 为了分布式 这个!!!!

partitionStrategy 类!4种!!!方式

graph 实用的是vertex cut 所以一个顶点可能在多个partition上,而一条边只会在一个partition!!!

graphX设计--------end=============

pagerank   triangleCount--------start=============

pagerank不讲了,最简单的!

val ranks = graph.pageRank(0.01).vertices //0.01越小越精确,数据量大要设置大一点。。直接实现好了

ranks.take(10).mkString("\n")

很方便实用:社交网络的推荐。。。。0.01不算太大!  排序!!!!!

triangleCount : 关系紧密:三角形个数!!!!srcid < desid

val graph = GraphLoader.edgeListFile(sc, "hdfs://server1:9000/data/graphx/followers.txt",true)

val c = graph.triangleCount().vertices

c.take(10)

pagerank   triangleCount--------end=============
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: