您的位置:首页 > 数据库

Spark RDD算子/SparkSQL分别实现对电影数据集的简单数据分析

2017-06-22 00:01 381 查看

数据集

MovieLens 1M Dataset

users.dat

UserID::Gender::Age::Occupation::Zip-code



movies.dat

MovieID::Title::Genres



ratings.dat

UserID::MovieID::Rating::Timestamp



RDD算子实现

1.年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影

val root = work.getClass.getResource("/")
val movieRdd = sc.textFile(root + "movies.dat", 3).map(_.split("::"))
val ratingRdd = sc.textFile(root + "ratings.dat", 3).map(_.split("::"))
val userRdd = sc.textFile(root + "users.dat", 3).map(_.split("::"))

val youngMale = userRdd.filter(a => a(1) == "M" && a(2).toInt >= 18 && a(2).toInt <= 25).map(a => (a(0), 1))
val rating = ratingRdd.map(a => (a(0), a(1)))
val movieID = rating.join(youngMale).map(_._2).reduceByKey(_ + _).sortBy(_._2, false).take(10)
//false降序,不加false,升序
val movieNmae = movieRdd.map(x => (x(0), x(1))).collect().toMap
val ans = movieID.map(x => movieNmae.get(x._1))
ans foreach println




2.得分最高的10部电影

val ratmovie = ratingRdd.map(x => (x(1), (x(2).toInt,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(x=>(x._1,x._2._1.toDouble/x._2._2.toDouble)).sortBy(_._2, false).take(10)
val ratMovName=ratmovie.map(x=>movieNmae.get(x._1))


输出电影ID



3.看过电影最多的前10个人

val userMovNum = ratingRdd.map(x => (x(0), x(1))).groupBy(_._1).map(x => (x._1, x._2.size)).sortBy(_._2, false).take(10)
userMovNum foreach println


输出用户ID和观看电影数量



4.5.男性看过最多的10部电影,女性看多最多的10部电影

val male = userRdd.filter(x => x(1) == "M").map(x => (x(0), 1))
val female = userRdd.filter(x => x(1) == "F").map(x => (x(0), 1))
val maleMovie = rating.join(male).map(_._2).reduceByKey(_ + _).sortBy(_._2, false).take(10)
val femaleMovie = rating.join(female).map(_._2).reduceByKey(_ + _).sortBy(_._2, false).take(10)
val maleMovNmae = maleMovie.map(x => movieNmae.get(x._1))
maleMovNmae foreach println
println
val femaleMovName = femaleMovie.map(x => movieNmae.get(x._1))
femaleMovName foreach println
println


male



female



数据集SogouQ2012.mini.tar.gz

访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL



1.将数据按照访问次数进行排序,求访问量前10的网站

val urlRdd=sc.textFile(root+"SogouQ.mini").map(_.split("\t")).map(x=>(x(5),1)).reduceByKey(_+_).sortBy(_._2,false).take(10)
urlRdd foreach println




val conf =new SparkConf().setMaster("local[*]").setAppName("sparksql")
val sc=new SparkContext(conf)
val sqlContext=new SQLContext(sc)
val root=work02.getClass.getResource("/")


SparkSQL实现

数据导入

import sqlContext.implicits._
val userRdd=sc.textFile(root+"users.dat").map(_.split("::"))
val userdf=userRdd.map(x=>user(x(0),x(1),x(2).toInt)).toDF

val movieRdd=sc.textFile(root+"movies.dat").map(_.split("::"))
val moviedf=movieRdd.map(x=>movie(x(0),x(1))).toDF

val ratingRdd=sc.textFile(root+"ratings.dat").map(_.split("::"))
val ratingdf=ratingRdd.map(x=>rating(x(0),x(1),x(2).toDouble)).toDF


年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影

val youngmale=userdf.filter("18<=age and gender='M'")
val youngratting=ratingdf.select("uid","mid")
val youngmovies=youngmale.join(youngratting, "uid").groupBy("mid").count.sort(-$"count").limit(10)
youngmovies.join(moviedf,"mid").select("title").show(false)




得分最高的10部电影

val top=ratingdf.groupBy("mid").agg("uid"->"count","rat"->"sum").withColumnRenamed("count(uid)","count").withColumnRenamed("sum(rat)","sum")//不改名则在SQL中不能用名来读到新增的这两列,不知为何
top.registerTempTable("top")
val top10=sqlContext.sql("select mid,sum/count from top").sort(-$"_c1").limit(10)
top10.join(moviedf,"mid").select("title").show(false)




看过电影最多的前10个人

ratingdf.groupBy("uid").count().sort(-$"count").show(10)




男性看过最多的10部电影

val male=userdf.filter("gender='M'").select("uid")
val allmovies=ratingdf.select("uid","mid")
val maleid=male.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)//.repartition($"count")没用
maleid.join(moviedf,"mid").select("title").show(false)




女性看多最多的10部电影

val female =userdf.filter("gender='F'").select("uid")
val femaleid=female.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)
femaleid.join(moviedf,"mid").select("title").show(false)
maleid.registerTempTable("maleid")
sqlContext.sql("select count(*) as count from maleid group by mid").join(moviedf).select("title").show(false)




访问量前10的网站

val urlRdd=sc.textFile(root+"SogouQ.mini").map(_.split("\t"))
val usrdf=urlRdd.map(w=>urlclass(w(5))).toDF
usrdf.groupBy("url").count.sort(-$"count").limit(10).show(false)




存在的问题:groupby和reuduceby相比很慢,尝试用SQL中的groupby效果一样,尝试用transform方法,调用RDD的reduceby报错,还在继续学习研究
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 数据分析
相关文章推荐