您的位置:首页 > 其它

Spark 共同好友解决方案:求大量集合的两两交集

2017-11-16 16:12 363 查看
Hadoop/MapReduce 共同好友解决方案:求大量集合的两两交集

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import scala.collection.mutable.ListBuffer
object FindCommonFriends {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("FindCommonFriends").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = "file:///media/chenjie/0009418200012FF3/ubuntu/friends.txt"
val output = "file:///media/chenjie/0009418200012FF3/ubuntu/friends"
val records = sc.textFile(input)
/*
* 100,200 300 400 500 600
200,100 300 400
300,100 200 400 500
400,100 200 300
500,100 300
600,100
* */
val pairs = records.flatMap(s => {
//100,200 300 400 500 600
val tokens = s.split(",")
val person = tokens(0).toLong//100
val friends = tokens(1).split("\\s+").map(_.toLong).toList//200 300 400 500 600
val result = for {
i <- 0 until friends.size
friend = friends(i)
} yield {
if (person < friend)
((person, friend), friends)
else
((friend, person), friends)
}
//以上代码等价于以下代码
/*val result = ListBuffer.empty[Tuple2[Tuple2[Long,Long],List[Long]]]
for(friend <- friends){
if(person < friend)
result += new Tuple2(new Tuple2(person, friend), friends)
else
result += new Tuple2(new Tuple2(friend, person), friends)
}*/
result
})
/*
* ((100,200),List(200, 300, 400, 500, 600))
((100,300),List(200, 300, 400, 500, 600))
((100,400),List(200, 300, 400, 500, 600))
((100,500),List(200, 300, 400, 500, 600))
((100,600),List(200, 300, 400, 500, 600))
((100,200),List(100, 300, 400))
((200,300),List(100, 300, 400))
((200,400),List(100, 300, 400))
((100,300),List(100, 200, 400, 500))
((200,300),List(100, 200, 400, 500))
((300,400),List(100, 200, 400, 500))
((300,500),List(100, 200, 400, 500))
((100,400),List(100, 200, 300))
((200,400),List(100, 200, 300))
((300,400),List(100, 200, 300))
((100,500),List(100, 300))
((300,500),List(100, 300))
((100,600),List(100))
* */
val grouped = pairs.groupByKey()
/*
* ((300,400),CompactBuffer(List(100, 200, 400, 500), List(100, 200, 300)))
((100,200),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 300, 400)))
((300,500),CompactBuffer(List(100, 200, 400, 500), List(100, 300)))
((100,500),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 300)))
((200,300),CompactBuffer(List(100, 300, 400), List(100, 200, 400, 500)))
((100,600),CompactBuffer(List(200, 300, 400, 500, 600), List(100)))
((100,300),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 200, 400, 500)))
((200,400),CompactBuffer(List(100, 300, 400), List(100, 200, 300)))
((100,400),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 200, 300)))
* */

val commonFriends = grouped.mapValues(iter => {
//CompactBuffer(List(100, 200, 400, 500), List(100, 200, 300))
/*val friendCount = for {
list <- iter
if !list.isEmpty
friend <- list
} yield ((friend, 1))
friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)*/
//以下代码等价于以上代码
val friendCount = ListBuffer.empty[Tuple2[Long,Int]]
for(list <- iter){
//List(100, 200, 400, 500)
//List(100, 200, 300))
if(!list.isEmpty){
for(friend <- list){
friendCount.+=(Tuple2(friend,1))
//(100,1) (200,1) (400,1) (500,1)
//(100,1) (200,1) (300,1)
}
}
}
//ListBuffer((100,1), (200,1), (400,1), (500,1), (100,1), (200,1), (300,1))

//friendCount.groupBy(_._1)
//Map(500 -> ListBuffer((500,1)),
// 300 -> ListBuffer((300,1)),
// 400 -> ListBuffer((400,1)),
// 200 -> ListBuffer((200,1), (200,1)),
// 100 -> ListBuffer((100,1), (100,1)))

//friendCount.groupBy(_._1).mapValues(_.unzip)
//Map(500 -> (ListBuffer(500),ListBuffer(1)),
// 300 -> (ListBuffer(300),ListBuffer(1)),
// 400 -> (ListBuffer(400),ListBuffer(1)),
// 200 -> (ListBuffer(200, 200),ListBuffer(1, 1)),
// 100 -> (ListBuffer(100, 100),ListBuffer(1, 1)))

//friendCount.groupBy(_._1).mapValues(_.unzip._2)
//Map(500 -> ListBuffer(1),
// 300 -> ListBuffer(1),
// 400 -> ListBuffer(1),
// 200 -> ListBuffer(1, 1),
// 100 -> ListBuffer(1, 1))

//friendCount.groupBy(_._1).mapValues(_.unzip._2.sum)
//Map(500 -> 1, 300 -> 1, 400 -> 1, 200 -> 2, 100 -> 2)

//friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1)
//Map(200 -> 2, 100 -> 2)

//friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)
//List(200, 100)
friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)
})
/*
((300,400),List(200, 100))
((100,200),List(300, 400))
((300,500),List(100))
((100,500),List(300))
((200,300),List(400, 100))
((100,600),List())
((100,300),List(500, 400, 200))
((200,400),List(100, 300))
((100,400),List(300, 200))
* */
commonFriends.saveAsTextFile(output)
val formatedResult = commonFriends.map(
f => s"(${f._1._1}, ${f._1._2})\t${f._2.mkString("[", ", ", "]")}"
)
formatedResult.foreach(println)
sc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 共同好友 交集