spark mllib 之音乐推荐
2017-06-27 23:56
246 查看
首先该例子取自spark高级数据分析第二章的样例原始数据集来自http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html 主要包括三个文件:
主要的数据集在文件user_artist_data.txt中,它包含141 000个用 户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。
数据集在 artist_data.txt 文件中给出了每个艺术家的 ID 和对应的名字。请注意,记录播放 信息时,客户端应用提交的是艺术家的名字。名字如果有拼写错误,或使用了非标准的名 称,事后才能被发现。为了将拼写错误的艺术家 ID 或 ID 变体对应到该艺术家的规范 ID,数据集提供了 artist_alias.txt 文件
针对这部分数据根据两个用户播放许多相似的歌曲来判断它们可能都喜欢某些歌曲,因为在构建矩阵的过程中矩阵特别稀疏,选择最小二乘法针对矩阵进行分解,具体见如下代码
主要的数据集在文件user_artist_data.txt中,它包含141 000个用 户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。
数据集在 artist_data.txt 文件中给出了每个艺术家的 ID 和对应的名字。请注意,记录播放 信息时,客户端应用提交的是艺术家的名字。名字如果有拼写错误,或使用了非标准的名 称,事后才能被发现。为了将拼写错误的艺术家 ID 或 ID 变体对应到该艺术家的规范 ID,数据集提供了 artist_alias.txt 文件
针对这部分数据根据两个用户播放许多相似的歌曲来判断它们可能都喜欢某些歌曲,因为在构建矩阵的过程中矩阵特别稀疏,选择最小二乘法针对矩阵进行分解,具体见如下代码
package com.demo.recommender import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.recommendation._ import org.apache.spark.rdd.RDD /** * Created by leslie on 16/11/30. */ object Recommend { def main(args: Array[String]) { val conf = new SparkConf().setMaster("").setAppName("") val sc = new SparkContext(conf) val baseDir = "/*/music/" val rawUserArtistData = sc.textFile(baseDir + "user_artist_data.txt") val rawArtistData = sc.textFile(baseDir + "artist_data.txt") val rawArtistAlias = sc.textFile(baseDir + "artist_alias.txt") prepare(rawUserArtistData, rawArtistData, rawArtistAlias) model(sc,rawUserArtistData,rawArtistData,rawArtistAlias) } //艺术家和艺术家名字 def buildArtistById(rawArtistData:RDD[String])={ rawArtistData.flatMap{ line=> val (id,name) = line.span(_!='\t') if(name.isEmpty){ None }else{ try{ Some(id.toInt,name.trim.toString) }catch { case e:Exception =>None } } } } //将错误的艺术家ID和非标准艺术家ID映射维艺术家正规名ID def buildArtistAlias(rawArtistAlias:RDD[String]):Map[Int,Int]= rawArtistAlias.flatMap { line => val tokens = line.split("\t") if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() def prepare(rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias:RDD[String])={ val userIdStats = rawUserArtistData.map(_.split(' ')(0).toDouble).stats() val itemIdStats = rawUserArtistData.map(_.split(' ')(1).toDouble).stats() println(userIdStats+" --> "+itemIdStats) val artistById = buildArtistById(rawArtistData) val artistAlias = buildArtistAlias(rawArtistAlias) val (badId,goodId) = artistAlias.head println("badId=>"+artistById.lookup(badId)+"\n"+"goodId=>"+artistById.lookup(goodId)) } //第一,将艺术家ID转为正规ID;第二,把数据转换成rating对象,它是ALS算法对“用户-产品-值”的抽象。其中产品指“向人们推荐的物品" def buildRating(rawUserArtistData:RDD[String],bArtistAlias:Broadcast[Map[Int,Int]])={ rawUserArtistData.map{line=> val Array(userId,artistId,count) = line.split(' ').map(_.toInt) val finalArtistId = bArtistAlias.value.getOrElse(artistId,artistId) Rating(userId,finalArtistId,count) } } def unpersist(model: MatrixFactorizationModel): Unit = { model.userFeatures.unpersist() model.productFeatures.unpersist() } /** * 第一个model * @param sc * @param rawUserArtistData * @param rawArtistData * @param rawArtistAlias */ def model(sc:SparkContext,rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias: RDD[String])={ val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias)) val trainData = buildRating (rawUserArtistData,bArtistAlias).cache() val model = ALS.trainImplicit(trainData,10,5,0.01,1.0) trainData.unpersist() println(model.userFeatures.mapValues(_.mkString(", ")).first()) val userId = 2093760 val recommendations = model.recommendProducts(userId,10) recommendations.foreach(println) val recommendationIds = recommendations.map(_.product).toSet val rawArtistForUser = rawUserArtistData.map(_.split(' ')).filter{case Array(user,_,_) => user.toInt==userId} val existingProduct = rawArtistForUser.map{case Array(_,artist,_) => artist.toInt}.collect().toSet val artistById = buildArtistById(rawArtistData) artistById.filter{case (id,name) => existingProduct.contains(id)}.values.collect().foreach(println) // artistById.filter{case (id,name) =>recommendationIds.contains(id)}.values.collect().foreach(println) unpersist(model) } /** * 模型评估AUC * @param positiveData * @param bAllItemIDS * @param predicitFunction * @return */ def areaUnderCurve(positiveData:RDD[Rating],bAllItemIDS:Broadcast[Array[Int]],predicitFunction:(RDD[(Int,Int)]) => RDD[Rating])={ val positiveUserProduct = positiveData.map(r=>(r.user,r.product)) val positivePredicition = predicitFunction(positiveUserProduct).groupBy(_.user) val negativeProducts = positiveUserProduct.groupByKey().mapPartitions{ userIDAndPosItemIDs=>{ val rand = new Random() val allItemIds = bAllItemIDS.value userIDAndPosItemIDs.map{case (userId,posItemIds)=> val posItemIdSets = posItemIds.toSet val negative = new ArrayBuffer[Int]() var i=0 while(i<allItemIds.size && negative.size<posItemIdSets.size){ val itemId = allItemIds(rand.nextInt(allItemIds.size)) if(!posItemIdSets.contains(itemId)){ negative+=itemId } i+=1 } negative.map(itemId=>(userId,itemId)) } } }.flatMap(t=>t) /** * */ val negativePredicition = predicitFunction(negativeProducts).groupBy(_.user) positivePredicition.join(negativePredicition).values.map{ case (positiveRating,negativerating) => var correct = 0 var total = 0 for(positive<-positiveRating;negative<-negativerating){ if(positive.rating>negative.rating){ correct+=1 } total+=1 } correct.toDouble/total }.mean() } /** * 预测收听最多的 * @param sc * @param train * @param allData * @return */ def predictMostListened(sc:SparkContext,train:RDD[Rating])(allData:RDD[(Int,Int)])={ val bListenCount = sc.broadcast(train.map(r=>(r.product,r.rating)).reduceByKey(_+_).collectAsMap()) allData.map{case (user,product) => Rating(user,product,bListenCount.value.getOrElse(product,0.0)) } } /** * 评估模型 * @param sc * @param rawUserArtistdata * @param rawArtistAlias */ def evaluate(sc:SparkContext,rawUserArtistdata:RDD[String],rawArtistAlias:RDD[String])={ val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias)) val allData = buildRating(rawUserArtistdata,bArtistAlias) val Array(traindata,cvData) = allData.randomSplit(Array(0.8,0.1)) traindata.cache() cvData.cache() val allItemIds = allData.map(_.product).distinct().collect() val bAllItemIds = sc.broadcast(allItemIds) val mostListenedAUC = areaUnderCurve(cvData,bAllItemIds,predictMostListened(sc,traindata)) println("MostListenedAUC : "+mostListenedAUC) // val evaluations = // for (rank <- Array(10, 50); // lambda <- Array(1.0, 0.0001); // alpha <- Array(1.0, 40.0)) // yield { // val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha) // val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict) // unpersist(model) // ((rank, lambda, alpha), auc) // } // evaluactions.sortBy(_._2).reverse.foreach(println) val evaluations = for (rank <- Array(10, 50); lambda <- Array(1.0, 0.0001); alpha <- Array(1.0, 40.0)) yield { val model = ALS.trainImplicit(traindata, rank, 10, lambda, alpha) val auc = areaUnderCurve(cvData, bAllItemIds, model.predict) unpersist(model) ((rank, lambda, alpha), auc) } evaluations.sortBy(_._2).reverse.foreach(println) traindata.unpersist() cvData.unpersist() } /** * 多用户推荐 * @param sc * @param rawUserArtistData * @param rawArtistData * @param rawArtistAlias */ def recommend(sc:SparkContext,rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias:RDD[String])={ val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias)) val allData = buildRating(rawUserArtistData,bArtistAlias) val model = ALS.trainImplicit(allData,50,10,0.1,40) allData.unpersist() val userId = 2093760 val recommendations = model.recommendProducts(userId,10) val recommendProductIds = recommendations.map(_.product).toSet val artistById = buildArtistById(rawArtistData) artistById.filter{case (user,name)=>recommendProductIds.contains(user)}.values.collect().foreach(println) val somUsers = allData.map(_.user).distinct().take(100) val someRecommendations = somUsers.map(userId => model.recommendProducts(userId,10)) someRecommendations.map(res => res.head.user +" -> "+res.map(_.product).mkString(",")).foreach(println) } }
相关文章推荐
- EMI音乐推荐竞赛
- 经典伤感配乐鉴听-人生的旅途|最佳音乐推荐网站-MP3火力网|音乐试听|唱片导购|
- 推荐一个Linux下非常棒的音乐播放管理工具-Songbird !
- 给各位推荐个音乐网
- 基于Spark MLlib平台的协同过滤算法---电影推荐系统
- 推荐我一个很喜欢的音乐软件 多米音乐盒
- 推荐算法(一)——音乐歌单智能推荐
- Recommending music on Spotify with deep learning 采用深度学习算法为Spotify做基于内容的音乐推荐
- 在Ubuntu上安装使用深度影音&深度音乐(推荐)
- python利用网易云音乐接口搭建的音乐推荐,根据单曲歌名推荐相关用户喜爱的歌曲
- 采用深度学习算法为Spotify做基于内容的音乐推荐
- CSDN特约专稿:音乐八宝盒的推荐引擎模式
- 推荐的静心放松音乐
- 音乐推荐&Audioscrobbler数据集
- 基于Spark Mllib,SparkSQL的电影推荐系统
- 音乐推荐数据集Million Song Dataset
- 强烈推荐好的音乐--来自广东本土歌手梁紫丹的音乐
- 聆听音乐-推荐好听的歌曲
- Android播放器推荐:可以播放本地音乐、视频、在线播放音乐、视频、网络收音机等