您的位置:首页 > 其它

spark中join和group操作

2016-06-04 15:42 399 查看
package com.scala

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

/**

 * scala测试join和cogroup:join是排列组合,而cgroup是分组

 */

object JoinAndCogroup {

  

  def main(args:Array[String]):Unit={

    val conf=new SparkConf().setAppName("joinAndcogroup").setMaster("local[1]")

    //获取context

    val sc=new SparkContext(conf)

    //创建泛型集合

    val stuList=List((1,"tom"),(2,"jim"),(3,"cassie"))

    val scoreList=List((1,20),(1,90),(1,30),(2,23),(2,23),(2,80),(3,90),(3,100),(3,100))

    //转化为RDD

    val stuRDD=sc.parallelize(stuList)

    val scoreRDD=sc.parallelize(scoreList)

  /*  //join操作

    //遍历

    val joinRDD=stuRDD.join(scoreRDD)

    for( join2 <- joinRDD ) {

      println("===========")

      println("id is "+join2._1)

      println("name is "+join2._2._1)

      println("score is "+join2._2._2)

    }    */

    

    //cogroup操作

    val gourpRDD=stuRDD.cogroup(scoreRDD)

    for (group2<- gourpRDD){

       println("===========")

      println("id is "+group2._1)

      println("name is "+group2._2._1)

      println("score is "+group2._2._2)

    }

    //遍历结果

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: