Spark高级数据分析· 3推荐引擎
2016-03-25 20:54
507 查看
![](http://img3.douban.com/lpic/s28323514.jpg)
推荐算法流程
推荐算法预备
wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz cd /Users/erichan/garden/spark-1.6.0-bin-hadoop2.6/bin ./spark-shell --master local --driver-memory 6g
1 准备数据
val data ="/Users/erichan/AliDrive/ml_spark/data/profiledata_06-May-2005" val rawUserArtistData = sc.textFile(data+"/user_artist_data.txt",10) // ALS 需要ID必须为数值型 rawUserArtistData.first //res3: String = 1092764 1000311 //rawUserArtistData.map(_.split(' ')(0).toDouble).stats() //res10: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000) //rawUserArtistData.map(_.split(' ')(1).toDouble).stats() //res11: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000) val rawArtistData = sc.textFile(data+"/artist_data.txt") //rawArtistData.first //res12: String = 1134999 06Crazy Life val artistByID = rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None }else{ try { Some((id.toInt, name.trim)) } catch { case e: NumberFormatException => None } } } val rawArtistAlias = sc.textFile(data+"/artist_alias.txt") val artistAlias = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None }else{ Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() //artistByID.lookup(1000010).head //res14: String = Aerosmith
2 建模
import org.apache.spark.mllib.recommendation._ val bArtistAlias = sc.broadcast(artistAlias) val trainData = rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) }.cache() val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
3 检验
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter { case Array(user,_,_) => user.toInt == 2093760 } val existingProducts = rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt }.collect().toSet artistByID.filter { case (id, name) => existingProducts.contains(id) }.values.collect().foreach(println) val recommendations = model.recommendProducts(2093760, 5) recommendations.foreach(println) val recommendedProductIDs = recommendations.map(_.product).toSet artistByID.filter { case (id, name) => recommendedProductIDs.contains(id) }.values.collect().foreach(println)
4 评价
:load /Users/erichan/sourcecode/book/aas/ch03-recommender/src/main/scala/RunAUC.scala val bArtistAlias = sc.broadcast(RunAUC.buildArtistAlias(rawArtistAlias)) val allData = RunAUC.buildRatings(rawUserArtistData, bArtistAlias) val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache() cvData.cache() val allItemIDs = allData.map(_.product).distinct().collect() val bAllItemIDs = sc.broadcast(allItemIDs) val mostListenedAUC = RunAUC.areaUnderCurve(cvData, bAllItemIDs, RunAUC.predictMostListened(sc, trainData)) println(mostListenedAUC) //0.9395286660878177 trainData.unpersist() cvData.unpersist()
5 推荐
val someUsers = allData.map(_.user).distinct().take(100) val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5)) someRecommendations.map( recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ") ).foreach(println)
附录
RunAUC.scalaimport org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.recommendation._ import org.apache.spark.rdd.RDD import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random /** * Created by erichan * on 16/1/26. */ object RunAUC { def areaUnderCurve( positiveData: RDD[Rating], bAllItemIDs: Broadcast[Array[Int]], predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive", and map to tuples val positiveUserProducts = positiveData.map(r => (r.user, r.product)) // Make predictions for each of them, including a numeric score, and gather by user val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user) // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other items, excluding those that are "positive" for the user. val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions { // mapPartitions operates on many (user,positive-items) pairs at once userIDAndPosItemIDs => { // Init an RNG and the item IDs set once for partition val random = new Random() val allItemIDs = bAllItemIDs.value userIDAndPosItemIDs.map { case (userID, posItemIDs) => val posItemIDSet = posItemIDs.toSet val negative = new ArrayBuffer[Int]() var i = 0 // Keep about as many negative examples per user as positive. // Duplicates are OK while (i < allItemIDs.size && negative.size < posItemIDSet.size) { val itemID = allItemIDs(random.nextInt(allItemIDs.size)) if (!posItemIDSet.contains(itemID)) { negative += itemID } i += 1 } // Result is a collection of (user,negative-item) tuples negative.map(itemID => (userID, itemID)) } } }.flatMap(t => t) // flatMap breaks the collections above down into one big set of tuples // Make predictions on the rest: val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user) // Join positive and negative by user positivePredictions.join(negativePredictions).values.map { case (positiveRatings, negativeRatings) => // AUC may be viewed as the probability that a random positive item scores // higher than a random negative one. Here the proportion of all positive-negative // pairs that are correctly ranked is computed. The result is equal to the AUC metric. var correct = 0L var total = 0L // For each pairing, for (positive <- positiveRatings; negative <- negativeRatings) { // Count the correctly-ranked pairs if (positive.rating > negative.rating) { correct += 1 } total += 1 } // Return AUC: fraction of pairs ranked correctly correct.toDouble / total }.mean() // Return mean AUC over users } def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int,Int)]) = { val bListenCount = sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap()) allData.map { case (user, product) => Rating(user, product, bListenCount.value.getOrElse(product, 0.0)) } } def buildArtistAlias(rawArtistAlias: RDD[String]): Map[Int,Int] = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() def buildRatings( rawUserArtistData: RDD[String], bArtistAlias: Broadcast[Map[Int,Int]]) = { rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) } } }
相关文章推荐
- WEB API异常处理
- 商业领域中的IT技术应用之二-POS收款机及收款系统介绍
- <机器学习实战>配套资料
- python 使用 passlib 库在 windows 平台实现 crypt
- Java实现二叉树的查询
- STL——STL中string的写时拷贝机制
- 懒加载
- StoryBoard
- 【bzoj1012】[JSOI2008]最大数maxnumber
- bzoj2843 极地旅行社
- Spark高级数据分析· 2数据分析
- Spark机器学习9· 实时机器学习(scala with sbt)
- MySQL左连接、右连接、笛卡尔积的表现形式
- char* 之间的赋值
- 我的Android进阶之旅------>Android采用AES+RSA的加密机制对http请求进行加密
- java语言程序设计第十版(Introduce to java 10th) 课后习题 chapter7-31
- BZOJ_P3944 Sum(数论+杜教筛)
- Spark 性能相关参数配置详解-shuffle篇
- Ipad UISplitViewController
- 3.25