您的位置:首页 > 其它

SparkGraphXTest.scala

2015-09-09 17:32 411 查看
/**
* Created by root on 9/8/15.
*/
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object SparkGraphXTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("graphx app").setMaster("local")
val sc = new SparkContext(conf)
val users: RDD[(VertexId, (String, String))] = sc.parallelize(
Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] = sc.parallelize(
Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing")
val graph = Graph(users, relationships, defaultUser)
val count1 = graph.vertices.filter{ case (id, (name, pos)) => pos == "postdoc" }.count()
val count2 = graph.edges.filter(e => e.srcId > e.dstId).count()
val count3 = graph.edges.filter{ case Edge(src, dst, prop) => src > dst }.count()
println(count1)
println(count2)
println(count3)
val facts: RDD[String] = graph.triplets.map(triplet =>
triplet.srcAttr._2 + " is the " + triplet.attr + " of " +triplet.dstAttr._2)
facts.collect().foreach(println(_))

val users2: RDD[(VertexId, (String, String ,String))] = sc.parallelize(
Array((3L, ("rxin", "student", "20")), (7L, ("jgonzal", "postdoc", "22")), (5L, ("franklin", "prof", "24")), (2L, ("istoica", "prof", "26"))))
val relationships2: RDD[Edge[String]] = sc.parallelize(
Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser2 = ("Amy Sun", "aaa", "18")
val graph2 = Graph(users2, relationships2, defaultUser2)
val facts2: RDD[String] = graph2.triplets.map(triplet =>
triplet.srcAttr.toString() + " is the " + triplet.attr + " of " +triplet.dstAttr.toString())
facts2.collect().foreach(println(_))
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: