您的位置:首页 > 其它

spark mllib 之音乐推荐

2017-06-27 23:56 246 查看
首先该例子取自spark高级数据分析第二章的样例原始数据集来自http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html 主要包括三个文件:

主要的数据集在文件user_artist_data.txt中,它包含141 000个用 户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。

数据集在 artist_data.txt 文件中给出了每个艺术家的 ID 和对应的名字。请注意,记录播放 信息时,客户端应用提交的是艺术家的名字。名字如果有拼写错误,或使用了非标准的名 称,事后才能被发现。为了将拼写错误的艺术家 ID 或 ID 变体对应到该艺术家的规范 ID,数据集提供了 artist_alias.txt 文件

针对这部分数据根据两个用户播放许多相似的歌曲来判断它们可能都喜欢某些歌曲,因为在构建矩阵的过程中矩阵特别稀疏,选择最小二乘法针对矩阵进行分解,具体见如下代码

package com.demo.recommender

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.RDD
/**
* Created by leslie on 16/11/30.
*/
object Recommend {

def main(args: Array[String]) {
val conf = new SparkConf().setMaster("").setAppName("")
val sc = new SparkContext(conf)
val baseDir = "/*/music/"
val rawUserArtistData = sc.textFile(baseDir + "user_artist_data.txt")
val rawArtistData = sc.textFile(baseDir + "artist_data.txt")
val rawArtistAlias = sc.textFile(baseDir + "artist_alias.txt")

prepare(rawUserArtistData, rawArtistData, rawArtistAlias)
model(sc,rawUserArtistData,rawArtistData,rawArtistAlias)

}
//艺术家和艺术家名字
def buildArtistById(rawArtistData:RDD[String])={
rawArtistData.flatMap{ line=>
val (id,name) = line.span(_!='\t')
if(name.isEmpty){
None
}else{
try{
Some(id.toInt,name.trim.toString)
}catch {
case e:Exception =>None
}
}
}
}
//将错误的艺术家ID和非标准艺术家ID映射维艺术家正规名ID
def buildArtistAlias(rawArtistAlias:RDD[String]):Map[Int,Int]=
rawArtistAlias.flatMap { line =>
val tokens = line.split("\t")
if (tokens(0).isEmpty) {
None
} else {
Some((tokens(0).toInt, tokens(1).toInt))
}
}.collectAsMap()

def prepare(rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias:RDD[String])={
val userIdStats = rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
val itemIdStats = rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
println(userIdStats+" --> "+itemIdStats)
val artistById = buildArtistById(rawArtistData)
val artistAlias = buildArtistAlias(rawArtistAlias)
val (badId,goodId) = artistAlias.head
println("badId=>"+artistById.lookup(badId)+"\n"+"goodId=>"+artistById.lookup(goodId))
}
//第一,将艺术家ID转为正规ID;第二,把数据转换成rating对象,它是ALS算法对“用户-产品-值”的抽象。其中产品指“向人们推荐的物品"
def buildRating(rawUserArtistData:RDD[String],bArtistAlias:Broadcast[Map[Int,Int]])={
rawUserArtistData.map{line=>
val Array(userId,artistId,count) = line.split(' ').map(_.toInt)
val finalArtistId = bArtistAlias.value.getOrElse(artistId,artistId)
Rating(userId,finalArtistId,count)
}
}

def unpersist(model: MatrixFactorizationModel): Unit = {
model.userFeatures.unpersist()
model.productFeatures.unpersist()
}

/**
* 第一个model
* @param sc
* @param rawUserArtistData
* @param rawArtistData
* @param rawArtistAlias
*/
def model(sc:SparkContext,rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias: RDD[String])={
val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))
val trainData = buildRating
(rawUserArtistData,bArtistAlias).cache()
val model = ALS.trainImplicit(trainData,10,5,0.01,1.0)
trainData.unpersist()
println(model.userFeatures.mapValues(_.mkString(", ")).first())
val userId = 2093760
val recommendations = model.recommendProducts(userId,10)
recommendations.foreach(println)
val recommendationIds = recommendations.map(_.product).toSet
val rawArtistForUser = rawUserArtistData.map(_.split(' ')).filter{case Array(user,_,_) => user.toInt==userId}
val existingProduct = rawArtistForUser.map{case Array(_,artist,_) => artist.toInt}.collect().toSet
val artistById = buildArtistById(rawArtistData)
artistById.filter{case (id,name) => existingProduct.contains(id)}.values.collect().foreach(println)
//    artistById.filter{case (id,name) =>recommendationIds.contains(id)}.values.collect().foreach(println)
unpersist(model)
}

/**
* 模型评估AUC
* @param positiveData
* @param bAllItemIDS
* @param predicitFunction
* @return
*/
def areaUnderCurve(positiveData:RDD[Rating],bAllItemIDS:Broadcast[Array[Int]],predicitFunction:(RDD[(Int,Int)]) => RDD[Rating])={
val positiveUserProduct = positiveData.map(r=>(r.user,r.product))
val positivePredicition = predicitFunction(positiveUserProduct).groupBy(_.user)
val negativeProducts = positiveUserProduct.groupByKey().mapPartitions{
userIDAndPosItemIDs=>{
val rand = new Random()
val allItemIds = bAllItemIDS.value
userIDAndPosItemIDs.map{case (userId,posItemIds)=>
val posItemIdSets = posItemIds.toSet
val negative = new ArrayBuffer[Int]()
var i=0
while(i<allItemIds.size && negative.size<posItemIdSets.size){
val itemId = allItemIds(rand.nextInt(allItemIds.size))
if(!posItemIdSets.contains(itemId)){
negative+=itemId
}
i+=1
}
negative.map(itemId=>(userId,itemId))
}

}

}.flatMap(t=>t)
/**
*
*/
val negativePredicition = predicitFunction(negativeProducts).groupBy(_.user)
positivePredicition.join(negativePredicition).values.map{
case (positiveRating,negativerating) =>
var correct = 0
var total = 0
for(positive<-positiveRating;negative<-negativerating){
if(positive.rating>negative.rating){
correct+=1
}
total+=1
}
correct.toDouble/total
}.mean()
}

/**
* 预测收听最多的
* @param sc
* @param train
* @param allData
* @return
*/
def predictMostListened(sc:SparkContext,train:RDD[Rating])(allData:RDD[(Int,Int)])={
val bListenCount =
sc.broadcast(train.map(r=>(r.product,r.rating)).reduceByKey(_+_).collectAsMap())
allData.map{case (user,product) =>
Rating(user,product,bListenCount.value.getOrElse(product,0.0))
}
}

/**
* 评估模型
* @param sc
* @param rawUserArtistdata
* @param rawArtistAlias
*/
def evaluate(sc:SparkContext,rawUserArtistdata:RDD[String],rawArtistAlias:RDD[String])={
val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))
val allData = buildRating(rawUserArtistdata,bArtistAlias)
val Array(traindata,cvData) = allData.randomSplit(Array(0.8,0.1))
traindata.cache()
cvData.cache()
val allItemIds = allData.map(_.product).distinct().collect()
val bAllItemIds = sc.broadcast(allItemIds)
val mostListenedAUC = areaUnderCurve(cvData,bAllItemIds,predictMostListened(sc,traindata))
println("MostListenedAUC : "+mostListenedAUC)

//    val evaluations =
//      for (rank   <- Array(10,  50);
//           lambda <- Array(1.0, 0.0001);
//           alpha  <- Array(1.0, 40.0))
//        yield {
//          val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha)
//          val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict)
//          unpersist(model)
//          ((rank, lambda, alpha), auc)
//        }
//    evaluactions.sortBy(_._2).reverse.foreach(println)
val evaluations =
for (rank   <- Array(10,  50);
lambda <- Array(1.0, 0.0001);
alpha  <- Array(1.0, 40.0))
yield {
val model = ALS.trainImplicit(traindata, rank, 10, lambda, alpha)
val auc = areaUnderCurve(cvData, bAllItemIds, model.predict)
unpersist(model)
((rank, lambda, alpha), auc)
}
evaluations.sortBy(_._2).reverse.foreach(println)

traindata.unpersist()
cvData.unpersist()
}

/**
* 多用户推荐
* @param sc
* @param rawUserArtistData
* @param rawArtistData
* @param rawArtistAlias
*/
def recommend(sc:SparkContext,rawUserArtistData:RDD[String],rawArtistData:RDD[String],rawArtistAlias:RDD[String])={
val bArtistAlias = sc.broadcast(buildArtistAlias(rawArtistAlias))
val allData = buildRating(rawUserArtistData,bArtistAlias)
val model = ALS.trainImplicit(allData,50,10,0.1,40)
allData.unpersist()
val userId = 2093760
val recommendations = model.recommendProducts(userId,10)
val recommendProductIds = recommendations.map(_.product).toSet
val artistById = buildArtistById(rawArtistData)
artistById.filter{case (user,name)=>recommendProductIds.contains(user)}.values.collect().foreach(println)
val somUsers = allData.map(_.user).distinct().take(100)
val someRecommendations = somUsers.map(userId => model.recommendProducts(userId,10))
someRecommendations.map(res => res.head.user +" -> "+res.map(_.product).mkString(",")).foreach(println)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: