您的位置:首页 > 运维架构

通过MapReduce进行电影推荐Movie Recommendations and More via MapReduce and Scalding

2016-03-25 18:24 711 查看
Scalding is an in-house MapReduce framework that Twitter recently open-sourced. Like Pig,
it provides an abstraction on top of MapReduce that makes it easy to write big data jobs in a syntax that’s simple and concise. Unlike Pig, Scalding is written in pure Scala – which means all the power of Scala and the JVM is already built-in. No more UDFs,
folks!

This is going to be an in-your-face introduction to Scalding,
Twitter’s (Scala + Cascading) MapReduce framework.

In 140: instead of forcing you to write raw
map
and
reduce
functions,
Scalding allows you to write natural code like

1
2

// Create a histogram of tweet lengths.
tweets.map('tweet -> 'length) { tweet : String => tweet.size }.groupBy('length) { _.size }

Not much different from the Ruby you’d write to compute tweet distributions over small data? Exactly.

Two notes before we begin:

This Github repository contains all the
code used.

For a gentler introduction to Scalding, see this
Getting Started guide on the Scalding wiki.


Movie Similarities

Imagine you run an online movie business, and you want to generate movie recommendations. You have a rating system (people can rate movies with 1 to 5 stars), and we’ll assume for simplicity that all of the ratings are stored in a TSV file somewhere.

Let’s start by reading the ratings into a Scalding job.
Input MovieSimilarities.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

/**
* The input is a TSV file with three columns: (user, movie, rating).
*/
val INPUT_FILENAME = "data/ratings.tsv"

/**
* Read in the input and give each field a type and name.
*/
val ratings = Tsv(INPUT_FILENAME, ('user, 'movie, 'rating))

/**
* Let's also keep track of the total number of people who rated each movie.
*/
val numRaters =
ratings
// Put the number of people who rated each movie into a field called "numRaters".
.groupBy('movie) { _.size }.rename('size -> 'numRaters)

// Merge `ratings` with `numRaters`, by joining on their movie fields.
val ratingsWithSize =
ratings.joinWithSmaller('movie -> 'movie, numRaters)

// ratingsWithSize now contains the following fields: (user, movie, rating, numRaters).

You want to calculate how similar pairs of movies are, so that if someone watches The Lion King, you can recommend films like Toy
Story. So how should you define the similarity between two movies?

One way is to use their correlation:

For every pair of movies A and B, find all the people who rated both A and B.

Use these ratings to form a Movie A vector and a Movie B vector.

Calculate the correlation between these two vectors.

Whenever someone watches a movie, you can then recommend the movies most correlated with it.

Let’s start with the first two steps.
Find rating pairs MovieSimilarities.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
* To get all pairs of co-rated movies, we'll join `ratings` against itself.
* So first make a dummy copy of the ratings that we can join against.
*/
val ratings2 =
ratingsWithSize
.rename(('user, 'movie, 'rating, 'numRaters) -> ('user2, 'movie2, 'rating2, 'numRaters2))

/**
* Now find all pairs of co-rated movies (pairs of movies that a user has rated) by
* joining the duplicate rating streams on their user fields,
*/
val ratingPairs =
ratingsWithSize
.joinWithSmaller('user -> 'user2, ratings2)
// De-dupe so that we don't calculate similarity of both (A, B) and (B, A).
.filter('movie, 'movie2) { movies : (String, String) => movies._1 < movies._2 }
.project('movie, 'rating, 'numRaters, 'movie2, 'rating2, 'numRaters2)

// By grouping on ('movie, 'movie2), we can now get all the people who rated any pair of movies.

Before using these rating pairs to calculate correlation, let’s stop for a bit.

Since we’re explicitly thinking of movies as vectors of ratings, it’s natural to compute some very vector-y things like
norms and dot products, as well as the length of each vector and the sum over all elements in each vector. So let’s compute these:
Vector calculations MovieSimilarities.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* Compute dot products, norms, sums, and sizes of the rating vectors.
*/
val vectorCalcs =
ratingPairs
// Compute (x*y, x^2, y^2), which we need for dot products and norms.
.map(('rating, 'rating2) -> ('ratingProd, 'ratingSq, 'rating2Sq)) {
ratings : (Double, Double) =>
(ratings._1 * ratings._2, math.pow(ratings._1, 2), math.pow(ratings._2, 2))
}
.groupBy('movie, 'movie2) { group =>
group.size // length of each vector
.sum('ratingProd -> 'dotProduct)
.sum('rating -> 'ratingSum)
.sum('rating2 -> 'rating2Sum)
.sum('ratingSq -> 'ratingNormSq)
.sum('rating2Sq -> 'rating2NormSq)
.max('numRaters) // Just an easy way to make sure the numRaters field stays.
.max('numRaters2)
// All of these operations chain together like in a builder object.
}

To summarize, each row in
vectorCalcs
now contains the following fields:

movie, movie2

numRaters, numRaters2: the total number of people who rated each movie

size: the number of people who rated both movie and movie2

dotProduct: dot product between the movie vector (a vector of ratings) and the movie2 vector (also a vector of ratings)

ratingSum, rating2sum: sum over all elements in each ratings vector

ratingNormSq, rating2Normsq: squared norm of each vector

So let’s go back to calculating the correlation between movie and movie2. We could, of course, calculate correlation in the standard way: find the covariance between the movie and movie2 ratings, and divide by their standard deviations.

But recall that we can also write correlation in the following form:

Corr(X,Y)=n∑xy−∑x∑yn∑x2−(∑x)2√n∑y2−(∑y)2√Corr(X,Y)=n∑xy−∑x∑yn∑x2−(∑x)2n∑y2−(∑y)2

(See the Wikipedia page on
correlation.)

Notice that every one of the elements in this formula is a field in
vectorCalcs
! So instead of using the standard calculation, we can use
this form instead:
Correlation MovieSimilarities.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15

val correlations =
vectorCalcs
.map(('size, 'dotProduct, 'ratingSum, 'rating2Sum, 'ratingNormSq, 'rating2NormSq) -> 'correlation) {
val fields : (Double, Double, Double, Double, Double, Double) =>
correlation(fields._1, fields._2, fields._3, fields._4, fields._5, fields._6)
}

def correlation(size : Double, dotProduct : Double, ratingSum : Double,
rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {

val numerator = size * dotProduct - ratingSum * rating2Sum
val denominator = math.sqrt(size * ratingNormSq - ratingSum * ratingSum) * math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)

numerator / denominator
}

And that’s it! To see the full code, check out the Github repository here.


Book Similarities

Let’s run this code over some real data. Unfortunately, I didn’t have a clean source of movie ratings available, so instead I used this
dataset of 1 million book ratings.

I ran a quick command, using the handy scald.rb
script that Scalding provides…

1
2

# Send the job off to a Hadoop cluster
scald.rb MovieSimilarities.scala --input ratings.tsv --output similarities.tsv

…and here’s a sample of the top output I got:





As we’d expect, we see that

Harry Potter books are similar to other Harry
Potter books

Lord of the Rings books are similar to other Lord
of the Rings books

Tom Clancy is similar to John Grisham

Chick lit (Summer Sisters, by Judy Blume) is similar to chick lit (Bridget
Jones)

Just for fun, let’s also look at books similar to The Great Gatsby:





(Schoolboy memories, exactly.)


More Similarity Measures

Of course, there are lots of other similarity measures we could use besides correlation.


Cosine Similarity

Cosine similarity is a another
common vector-based similarity measure.
Cosine Similarity MovieSimilarities.scala

1
23

def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
dotProduct / (ratingNorm * rating2Norm)
}


Correlation, Take II

We can also also add a regularized correlation, by (say) adding N virtual movie pairs that have zero correlation. This helps avoid noise if
some movie pairs have very few raters in common (for example, The Great Gatsby had an unlikely raw correlation of 1 with many other books,
due simply to the fact that those book pairs had very few ratings).
Regularized Correlation MovieSimilarities.scala

1
23
4
5
6
7
8
9

def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double,
rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double,
virtualCount : Double, priorCorrelation : Double) = {

val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
val w = size / (size + virtualCount)

w * unregularizedCorrelation + (1 - w) * priorCorrelation
}


Jaccard Similarity

Recall that one
of the lessons of the Netflix prize was that implicit data can be quite useful – the mere fact that you rate a James Bond movie, even if you rate it quite horribly, suggests that you’d probably be interested in similar action films. So we can also ignore
the value itself of each rating and use a set-based similarity measure like Jaccard
similarity.
Jaccard Similarity MovieSimilarities.scala

1
23
4

def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
val union = totalUsers1 + totalUsers2 - usersInCommon
usersInCommon / union
}


Incorporation

Finally, let’s add all these similarity measures to our output.
Similarity Measures MovieSimilarities.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

val PRIOR_COUNT = 10
val PRIOR_CORRELATION = 0

val similarities =
vectorCalcs
.map(('size, 'dotProduct, 'ratingSum, 'rating2Sum, 'ratingNormSq, 'rating2NormSq, 'numRaters, 'numRaters2) ->
('correlation, 'regularizedCorrelation, 'cosineSimilarity, 'jaccardSimilarity)) {

fields : (Double, Double, Double, Double, Double, Double, Double, Double) =>

val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields

val corr = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
val regCorr = regularizedCorrelation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, PRIOR_COUNT, PRIOR_CORRELATION)
val cosSim = cosineSimilarity(dotProduct, math.sqrt(ratingNormSq), math.sqrt(rating2NormSq))
val jaccard = jaccardSimilarity(size, numRaters, numRaters2)

(corr, regCorr, cosSim, jaccard)
}


Book Similarities Revisited

Let’s take another look at the book similarities above, now that we have these new fields.

Here are some of the top Book-Crossing pairs, sorted by their shrunk correlation:





Notice how regularization affects things: the Dark Tower pair has a pretty high raw correlation, but relatively few ratings (reducing our
confidence in the raw correlation), so it ends up below the others.

And here are books similar to The Great Gatsby, this time ordered by cosine similarity:





Input Abstraction

So our code right now is tied to our specific
ratings.tsv
input. But what if we change the way we store our ratings, or what if we want to
generate similarities for something entirely different?

Let’s abstract away our input. We’ll create a VectorSimilarities
class that represents input data in the following format:
Input abstraction VectorSimilarities.scala

1
23
4
5
6
7

// This is an abstract method that returns a Pipe (aka, a stream of rating tuples).
// It takes in three symbols that name the user, item, and rating fields.
def input(userField : Symbol, itemField : Symbol, ratingField : Symbol) : Pipe

val ratings = input('user, 'item, 'rating)
// ...
// The rest of the code remains essentially the same.

Whenever we want to define a new input format, we simply subclass
VectorSimilarities
and provide a concrete implementation of the
input
method.


Book-Crossings

For example, here’s a class I could have used to generate the book recommendations above:
BookCrossing similarities BookCrossing.scala

1
23
4
5
6
7
8
9
10

class BookCrossing(args : Args) extends VectorSimilarities(args) {
override def input(userField : Symbol, itemField : Symbol, ratingField : Symbol) : Pipe = {
val bookCrossingRatings =
Tsv("book-crossing-ratings.tsv")
.read
.mapTo((0, 1, 2) -> (userField, itemField, ratingField)) { fields : (String, String, Double) => fields }

bookCrossingRatings
}
}

The input method simply reads from a TSV file and lets the
VectorSimilarities
superclass do all the work. Instant recommendations, BOOM.


Song Similarities with Twitter + iTunes

But why limit ourselves to books? We do, after all, have Twitter at our fingertips…

rated Born This Way by Lady GaGa 5 stars itun.es/iSg92N #iTunes
— gggf (@GalMusic92) February
8, 2012

Since iTunes lets you send a tweet whenever you rate a song, we can use these to generate music recommendations!

Again, we create a new class that overrides the abstract
input
defined in
VectorSimilarities

Song similarities with Twitter + iTunes ITunes.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

class ITunes(args : Args) extends VectorSimilarities(args) {
// Example tweet:
// rated New Kids On the Block: Super Hits by New Kids On the Block 5 stars http://itun.es/iSg3Fc #iTunes
val ITUNES_REGEX = """rated (.+?) by (.+?) (\d) stars .*? #iTunes""".r

override def input(userField : Symbol, itemField : Symbol, ratingField : Symbol) : Pipe = {
val itunesRatings =
// This is a Twitter-internal tweet source, but you could just as easily scrape
// Twitter yourself and provide your own source of tweets: https://dev.twitter.com/docs TweetSource()
.mapTo('userId, 'text) { s => (s.getUserId, s.getText) }
.filter('text) { text : String => text.contains("#iTunes") }
.flatMap('text -> ('song, 'artist, 'rating)) {
text : String =>
ITUNES_REGEX.findFirstMatchIn(text).map { _.subgroups }.map { l => (l(0), l(1), l(2)) }
}
.rename(('userId, 'song, 'rating) -> (userField, itemField, ratingField))
.project(userField, itemField, ratingField)

itunesRatings
}
}

…and snap! Here are some songs you might like if you recently listened to Beyoncé:





And some recommended songs if you like Lady Gaga:





GG Pandora.


Location Similarities with Foursquare Check-ins

But what if we don’t have explicit ratings? For example, we could be a news site that wants to generate article recommendations, and maybe we only have user visits on
each story.

Or what if we want to generate restaurant or tourist recommendations, when all we know is who visits each location?

I’m at Empire State Building (350 5th Ave., btwn 33rd & 34th St., New York) 4sq.com/zZ5xGd
— Simon Ackerman (@SimonAckerman) February
8, 2012

Let’s finally make Foursquare check-ins useful. (I kid, I kid.)

Instead of using an explicit rating given to us, we can simply generate a dummy rating of 1 for each check-in. Correlation doesn’t make sense any more, but we can still pay attention to a measure like Jaccard simiilarity.

So we simply create a new class that scrapes tweets for Foursquare check-in information…
Location similarities with Foursquare Foursquare.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

class Foursquare(args : Args) extends VectorSimilarities(args) {
// Example tweet: I'm at The Ambassador (673 Geary St, btw Leavenworth & Jones, San Francisco) w/ 2 others http://4sq.com/xok3rI // Let's limit to New York for simplicity.
val FOURSQUARE_REGEX = """I'm at (.+?) \(.*? New York""".r

override def input(userField : Symbol, itemField : Symbol, ratingField : Symbol) : Pipe = {
val foursquareCheckins =
TweetSource()
.mapTo('userId, 'text) { s => (s.getUserId.toLong, s.getText) }
.flatMap('text -> ('location, 'rating)) {
text : String =>
FOURSQUARE_REGEX.findFirstMatchIn(text).map { _.subgroups }.map { l => (l(0), 1) }
}
.rename(('userId, 'location, 'rating) -> (userField, itemField, ratingField))
.unique(userField, itemField, ratingField)

foursquareCheckins
}
}

…and bam! Here are locations similar to the Empire State Building:





Here are places you might want to check out, if you check-in at Bergdorf Goodman:





And here’s where to go after the Statue of Liberty:





Power of Twitter, yo.


RottenTomatoes Similarities

UPDATE: I found some movie data after all…

My review for ‘How to Train Your Dragon’ on Rotten Tomatoes: 4 1/2 stars >bit.ly/xtw3d3
— Benjamin West (@BenTheWest) February
21, 2012

So let’s use RottenTomatoes tweets to recommend movies! Here’s the code for a class that searches for RottenTomatoes tweets:
Movie similarities with RottenTomatoes RottenTomatoes.scala

1
23
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

class RottenTomatoes(args : Args) extends VectorSimilarities(args) {
/**
* Example tweets:
* My review for 'Hop' on Rotten Tomatoes: 1 star > http://bit.ly/AB7Tl4 * My review for 'The Bothersome Man (Den Brysomme mannen)' on Rotten Tomatoes: 3 stars-A muddled Playtime in Paris,... http://tmto.es/AvPoO2 */
val ROTTENTOMATOES_REGEX = """My review for '(.+?)' on Rotten Tomatoes: (\d) star""".r

override val MIN_NUM_RATERS = 2
override val MAX_NUM_RATERS = 1000
override val MIN_INTERSECTION = 2

override def input(userField : Symbol, itemField : Symbol, ratingField : Symbol) : Pipe = {
val rottenTomatoesRatings =
TweetSource()
.mapTo('userId, 'text) { s => (s.getUserId.toLong, s.getText) }
.flatMap('text -> ('movie, 'rating)) {
text : String =>
ROTTENTOMATOES_REGEX.findFirstMatchIn(text).map { _.subgroups }.map { x => (x(0), x(1).toInt) }
}
.rename(('userId, 'movie, 'rating) -> (userField, itemField, ratingField))
.unique(userField, itemField, ratingField)

rottenTomatoesRatings
}
}

And here are the most similar movies discovered:





We see that

Lord of the Rings, Harry Potter,
and Star Wars movies are similar to other Lord
of the Rings, Harry Potter, and Star
Wars movies

Big science fiction blockbusters (Avatar) are similar to big science fiction blockbusters (Inception)

People who like one Justin Timberlake movie (Bad Teacher) also like other Justin Timberlake Movies (In
Time). Similarly with Michael Fassbender (A Dangerous Method, Shame)

Art house movies (The Tree of Life) stick together (Tinker
Tailor Soldier Spy)

Let’s also look at the movies with the most negative correlation:





(The more you like loud and dirty popcorn movies (Thor) and vamp romance (Twilight),
the less you like arthouse? SGTM.)


Next Steps

Hopefully I gave you a taste of the awesomeness of Scalding. To learn even more:

Check out Scalding on Github.

Read this Getting Started
Guide on the Scalding wiki.

Run through this code-based
introduction, complete with Scalding jobs that you can run in local mode.

Browse the API reference,
which also contains many code snippets illustrating different Scalding functions (e.g.,
map
,
filter
,
flatMap
,
groupBy
,
count
,
join
).

And all the code for this post is here.

Watch out for more documentation soon, and you should most definitely follow
@Scalding on Twitter for updates or to ask any questions.


Mad Props

And finally, a huge shoutout to Argyris Zymnis, Avi
Bryant, and Oscar Boykin, the mastermind hackers
who have spent (and continue spending!) unimaginable hours making Scalding a joy to use.

@argyris, @avibryant, @posco: Thanks for it all. #awesomejobguys #loveit

from: http://blog.echen.me/2012/02/09/movie-recommendations-and-more-via-mapreduce-and-scalding/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息