Spark 共同好友解决方案:求大量集合的两两交集
2017-11-16 16:12
363 查看
Hadoop/MapReduce 共同好友解决方案:求大量集合的两两交集
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ListBuffer object FindCommonFriends { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("FindCommonFriends").setMaster("local") val sc = new SparkContext(sparkConf) val input = "file:///media/chenjie/0009418200012FF3/ubuntu/friends.txt" val output = "file:///media/chenjie/0009418200012FF3/ubuntu/friends" val records = sc.textFile(input) /* * 100,200 300 400 500 600 200,100 300 400 300,100 200 400 500 400,100 200 300 500,100 300 600,100 * */ val pairs = records.flatMap(s => { //100,200 300 400 500 600 val tokens = s.split(",") val person = tokens(0).toLong//100 val friends = tokens(1).split("\\s+").map(_.toLong).toList//200 300 400 500 600 val result = for { i <- 0 until friends.size friend = friends(i) } yield { if (person < friend) ((person, friend), friends) else ((friend, person), friends) } //以上代码等价于以下代码 /*val result = ListBuffer.empty[Tuple2[Tuple2[Long,Long],List[Long]]] for(friend <- friends){ if(person < friend) result += new Tuple2(new Tuple2(person, friend), friends) else result += new Tuple2(new Tuple2(friend, person), friends) }*/ result }) /* * ((100,200),List(200, 300, 400, 500, 600)) ((100,300),List(200, 300, 400, 500, 600)) ((100,400),List(200, 300, 400, 500, 600)) ((100,500),List(200, 300, 400, 500, 600)) ((100,600),List(200, 300, 400, 500, 600)) ((100,200),List(100, 300, 400)) ((200,300),List(100, 300, 400)) ((200,400),List(100, 300, 400)) ((100,300),List(100, 200, 400, 500)) ((200,300),List(100, 200, 400, 500)) ((300,400),List(100, 200, 400, 500)) ((300,500),List(100, 200, 400, 500)) ((100,400),List(100, 200, 300)) ((200,400),List(100, 200, 300)) ((300,400),List(100, 200, 300)) ((100,500),List(100, 300)) ((300,500),List(100, 300)) ((100,600),List(100)) * */ val grouped = pairs.groupByKey() /* * ((300,400),CompactBuffer(List(100, 200, 400, 500), List(100, 200, 300))) ((100,200),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 300, 400))) ((300,500),CompactBuffer(List(100, 200, 400, 500), List(100, 300))) ((100,500),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 300))) ((200,300),CompactBuffer(List(100, 300, 400), List(100, 200, 400, 500))) ((100,600),CompactBuffer(List(200, 300, 400, 500, 600), List(100))) ((100,300),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 200, 400, 500))) ((200,400),CompactBuffer(List(100, 300, 400), List(100, 200, 300))) ((100,400),CompactBuffer(List(200, 300, 400, 500, 600), List(100, 200, 300))) * */ val commonFriends = grouped.mapValues(iter => { //CompactBuffer(List(100, 200, 400, 500), List(100, 200, 300)) /*val friendCount = for { list <- iter if !list.isEmpty friend <- list } yield ((friend, 1)) friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1)*/ //以下代码等价于以上代码 val friendCount = ListBuffer.empty[Tuple2[Long,Int]] for(list <- iter){ //List(100, 200, 400, 500) //List(100, 200, 300)) if(!list.isEmpty){ for(friend <- list){ friendCount.+=(Tuple2(friend,1)) //(100,1) (200,1) (400,1) (500,1) //(100,1) (200,1) (300,1) } } } //ListBuffer((100,1), (200,1), (400,1), (500,1), (100,1), (200,1), (300,1)) //friendCount.groupBy(_._1) //Map(500 -> ListBuffer((500,1)), // 300 -> ListBuffer((300,1)), // 400 -> ListBuffer((400,1)), // 200 -> ListBuffer((200,1), (200,1)), // 100 -> ListBuffer((100,1), (100,1))) //friendCount.groupBy(_._1).mapValues(_.unzip) //Map(500 -> (ListBuffer(500),ListBuffer(1)), // 300 -> (ListBuffer(300),ListBuffer(1)), // 400 -> (ListBuffer(400),ListBuffer(1)), // 200 -> (ListBuffer(200, 200),ListBuffer(1, 1)), // 100 -> (ListBuffer(100, 100),ListBuffer(1, 1))) //friendCount.groupBy(_._1).mapValues(_.unzip._2) //Map(500 -> ListBuffer(1), // 300 -> ListBuffer(1), // 400 -> ListBuffer(1), // 200 -> ListBuffer(1, 1), // 100 -> ListBuffer(1, 1)) //friendCount.groupBy(_._1).mapValues(_.unzip._2.sum) //Map(500 -> 1, 300 -> 1, 400 -> 1, 200 -> 2, 100 -> 2) //friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1) //Map(200 -> 2, 100 -> 2) //friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1) //List(200, 100) friendCount.groupBy(_._1).mapValues(_.unzip._2.sum).filter(_._2 > 1).map(_._1) }) /* ((300,400),List(200, 100)) ((100,200),List(300, 400)) ((300,500),List(100)) ((100,500),List(300)) ((200,300),List(400, 100)) ((100,600),List()) ((100,300),List(500, 400, 200)) ((200,400),List(100, 300)) ((100,400),List(300, 200)) * */ commonFriends.saveAsTextFile(output) val formatedResult = commonFriends.map( f => s"(${f._1._1}, ${f._1._2})\t${f._2.mkString("[", ", ", "]")}" ) formatedResult.foreach(println) sc.stop() } }
相关文章推荐
- Hadoop/MapReduce 共同好友解决方案:求大量集合的两两交集
- 对于多个集合求两两交集(共同关注的用户、共同转载的微薄等)
- MapReduce案例4——求两两共同好友
- spark graphx实现共同好友的聚合
- Spark实现之 查找共同好友
- 寻找共同好友(hadoop解决方案)
- spark解决方案系列--------1.spark-streaming实时Join存储在HDFS大量数据的解决方案
- Spark 好友推荐解决方案
- spark 集合交集差集运算
- MapReduce—案例(五)求两两共同好友
- Spark任务提交jar包依赖解决方案
- Spark性能优化的10大问题及其解决方案
- 大量POI点展示的一种解决方案
- Xcode8控制台输出大量无用信息的解决方案
- 计算集合的交集和并集
- A、B两个整数集合,设计一个算法求他们的交集,尽可能的高效
- 无法打开文件夹的集合解决方案
- 【Spark】SparkStreaming-流处理-规则动态更新-解决方案
- SQL 数据库疑难解决方案集合
- 【Java】求多个集合的交集