您的位置:首页 > 其它

基于spark的DataFrame实战

2016-07-27 21:24 453 查看

Spark 中的另一核心功能是DataFrame,方便处理结构化数据。实例中还是以上一篇博客中的数据为基础。

我们要求以下数据:

1、查看338用户的评分记录;

2、将结果保存成csv格式;

3、评论电影最多的用户id;

4、被用户评论最多的电影id、title;

5、评论电影年龄最小者、最大者;

6、25至30岁的用户最喜欢的电影;

7、最受用户喜爱的电影;

代码如下:

 

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* 更多内容请参考:http://www.iteblog.com/archives/1566#DataFrame-4
*
*/
object MoviesDataStatistics {

case class Ratings(userId: Int, movieId: Int, rating: Double)

case class Movies(id: Int, movieTitle: String, releaseDate: String)

case class Users(id: Int, age: Int, gender: String)

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MoviesDataStatistics")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ratingsDF: DataFrame = sc.textFile("/data/ratings.data").map(x => x.split("::")).map(line => Ratings(line(0).toInt, line(1).toInt, line(2).toDouble)).toDF()
ratingsDF.registerTempTable("ratings")
//查看338评分记录条数
println("sql for 338 rateing info is : ")
sqlContext.sql("select * from ratings where userId = 338").show()
println("dataframe 338 rateing info is : ")
ratingsDF.filter(ratingsDF("userId").equalTo(338)).show()

val userDataDF = sc.textFile("/data/user.data").map(x => x.split("[|]")).map(line => Users(line(0).toInt, line(1).toInt, line(2))).toDF()
userDataDF.registerTempTable("users")
sqlContext.sql("select * from users where id = 338").show()
userDataDF.filter(userDataDF("id").equalTo(338)).show()

val movieDF = sc.textFile("/data/movies.data").map(x => x.split("::")).map(line => Movies(line(0).toInt, line(1), line(2))).toDF()
movieDF.registerTempTable("movies")
movieDF.collect()
sqlContext.sql("select * from movies where id = 1").show()
movieDF.filter(movieDF("id").equalTo(1)).show()

sqlContext.sql("select r.userId,m.movieTitle,r.rating from movies m inner join ratings r on m.id = r.movieId and r.userId = 338 order by r.rating desc ").show()
val resultDF = movieDF.join(ratingsDF.filter(ratingsDF("userId").equalTo(338)), movieDF("id").equalTo(ratingsDF("movieId")))
.sort(ratingsDF("rating").desc).select("userId", "movieTitle", "rating")

resultDF.collect().foreach(println)
import org.apache.spark.sql.functions._
//将结果保存至csv格式
//val saveOptions = Map("header" -> "true", "path" -> "/data/rat_movie.csv")
//resultDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOptions).save()
// 评论电影最多的用户id
sqlContext.sql("select userId,count(*) as count from ratings group by userId order by count desc ").show(1)
val userIdCountDF = ratingsDF.groupBy("userId").count()
userIdCountDF.join(userIdCountDF.agg(max("count").alias("max_count")), $"count".equalTo($"max_count")).select("userId").show(1)

// 被用户评论最多的电影id、title
val movieIDGroupDF = ratingsDF.groupBy("movieId").count()
val movieCountDF = movieIDGroupDF.join(movieIDGroupDF.agg(max("count").alias("max_count"))).filter($"count".equalTo($"max_count"))
//星球大战是被用户评论最多的电影
movieCountDF.join(movieDF).filter($"movieId".equalTo($"id")).select("movieId", "movieTitle", "releaseDate").show()

// 评论电影年龄最小者、最大者
// 年龄最大的73岁,最小的7岁
ratingsDF.join(userDataDF, ratingsDF("userId").equalTo(userDataDF("id")))
.agg(min($"age").alias("min_age"), max($"age").alias("max_age"))
.join(userDataDF, $"age".isin($"min_age", $"max_age"))
.select("id", "age", "gender").show(2)
// https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html
// 25至30岁的用户欢迎的电影
userDataDF.filter($"age".between(25, 30)).join(ratingsDF, $"id".equalTo($"userId"))
.select("userId", "movieId", "rating").join(movieDF, $"rating".equalTo(5)).select("movieId", "movieTitle").show(10)
// 最受用户喜爱的电影
ratingsDF.groupBy("movieId").agg(avg("rating").alias("avg_rate"))
.sort($"avg_rate".desc).limit(10)
.join(movieDF, $"movieId".equalTo($"id"))
.select("movieTitle").show(false)
}
}

 总结:

 

1、创建DF时需要引入import sqlContext.implicits._

2、使用DF函数时,需要import org.apache.spark.sql.functions._

3、DF的函数功能非常强大,基本的函数功能一定要掌握;

4、个人认为DF的功能比Sql的功能强大

参考:

https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice3/

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐