基于hadoop的推荐算法-mahout版
2015-09-10 15:50
447 查看
/article/3865208.html
基于hadoop的推荐算法,讲其中mahout实现的基于项目的推荐算法
分为4步:
1.获得人-物 用户矩阵
输入为所有人对物品的评价或关联
map端输出key为人,value为物品+倾好度
reeduce端输出key为人,vallue为多个物品+倾好度
2.获得物-物 项目矩阵
输入为“用户矩阵”,讲每一行人-物数据中的物品做笛卡尔积,生产成物-物的关联
map端输出为key为物,value为关联度
reduce端输出key为物,value为多个物的关联度
(可以根据各种规则生成项目相似度矩阵表,此处算法带过)
修改:
求项目相似矩阵是基于项目的协同过滤算法的核心
公式有很多种,核心是物品i和物品j相关用户的交集与并集的商
mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))
2.norms(i) = sum(Pi(u)^2)
3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)
mahout的实现方法是
第一个job,用物品-人的矩阵,求得norms,即物品的用户平方和,输出是物-norms
第二个job,Map:用人-物的矩阵,求Pi(u)*Pi(u),即相同用户的物品的评价的乘机,输出物-多个对端物品的Pi(u)*Pi(u)
Reduce:用物-多个对端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩阵(因为这个时候可以汇总所有和这个物品相关的物品的dot)
第三个job,补全物品的相似矩阵
3.获得用户-项目相似矩阵
输入为人-物 用户矩阵 和 物-物 项目矩阵
Map端输出key为物,value为类VectorOrPrefWritable,是包含物与人的倾好度,或是物与物的相似度
reduce端输出key为物,value为类VectorAndPrefWritable,是汇总当个物品到所有人的倾好度和到所有物品的相似度
4.获得用户推荐矩阵
输入为VectorAndPrefWritable
Map端输出为key:人,value:物+系数(map端根据单个物品贡献的系数生成推荐系数,也就是人到物品A的倾好度*物品A到其他物品的相似度)
reduce端输出为key:人,,value:推荐项目+系数(reduce端使用自定公式,汇总所有单物品贡献的四叔,求人到其他项目的倾好度,取topn作为当前用户的推荐项目)
再在这里贴几个mahout推荐算法分析的帖子:
http://eric-gcm.iteye.com/blog/1817822 http://eric-gcm.iteye.com/blog/1818033 http://eric-gcm.iteye.com/blog/1820060
以下是mahout代码:
ItemSimilarityJob类是mahout使用hadoop做推荐引擎的主要实现类,下面开始分析。
run()函数是启动函数:
Java代码
public final class RecommenderJob extends AbstractJob {
public static final String BOOLEAN_DATA = "booleanData";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@Override
public int run(String[] args) throws Exception {
//这里原来有大一堆代码,都是用来载入配置项,不用管它
//第一步:准备矩阵,将原始数据转换为一个矩阵,在PreparePreferenceMatrixJob这个类中完成
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
"--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString()});
numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
//第二步:计算协同矩阵
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
/* special behavior if phase 1 is skipped */
if (numberOfUsers == -1) {
numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
PathType.LIST, null, getConf());
}
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
//calculate the co-occurrence matrix
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job prePartialMultiply1 = prepareJob(
similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = prePartialMultiply1.waitForCompletion(true);
if (!succeeded)
return -1;
//continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
maxPrefsPerUser);
succeeded = prePartialMultiply2.waitForCompletion(true);
if (!succeeded)
return -1;
//finish the job
Job partialMultiply = prepareJob(
new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded)
return -1;
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
//filter out any users we don't care about
/* convert the user/item pairs to filter if a filterfile has been specified */
if (filterFile != null) {
Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = itemFiltering.waitForCompletion(true);
if (!succeeded)
return -1;
}
String aggregateAndRecommendInput = partialMultiplyPath.toString();
if (filterFile != null) {
aggregateAndRecommendInput += "," + explicitFilterPath;
}
//extract out the recommendations
Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
}
if (filterFile != null) {
setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
}
setIOSort(aggregateAndRecommend);
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
if (!succeeded)
return -1;
}
return 0;
}
第二步,计算协同矩阵,主要在RowSimilarityJob 这个类中完成
Java代码
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
可以看到这个job的输入路径就是上一篇中,PreparePreferenceMatrixJob中最后一个reducer的输出路径。
下边详细分析RowSimilarityJob类的实现:
Java代码
public class RowSimilarityJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
//一大堆载入参数的代码,忽略
//第一个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第二个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
pairwiseConf.set(NORMS_PATH, normsPath.toString());
pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第三个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
VectorWritable.class);
asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
boolean succeeded = asMatrix.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
return 0;
}
可以看到RowSimilityJob也是分成三个MapReduce过程:
1、Mapper :VectorNormMapper类,输出 ( userid_index, <itemid_index, pref> )类型
Java代码
public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector rowVector = similarity.normalize(vectorWritable.get());
int numNonZeroEntries = 0;
double maxValue = Double.MIN_VALUE;
Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element element = nonZeroElements.next();
RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
partialColumnVector.setQuick(row.get(), element.get());
//输出 ( userid_index, <itemid_index, pref> )类型
ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
numNonZeroEntries++;
if (maxValue < element.get()) {
maxValue = element.get();
}
}
if (threshold != NO_THRESHOLD) {
nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
maxValues.setQuick(row.get(), maxValue);
}
norms.setQuick(row.get(), similarity.norm(rowVector));
//计算item的总数
ctx.getCounter(Counters.ROWS).increment(1);
}
}
Reduer : MergeVectorsReducer类,输入的是(userid_index, <itemid_index, pref>),同一个userid_index在此进行合并,输出( userid_index, vector<itemid_index, pref> )
Java代码
public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
throws IOException, InterruptedException {
Vector partialVector = Vectors.merge(partialVectors);
if (row.get() == NORM_VECTOR_MARKER) {
Vectors.write(partialVector, normsPath, ctx.getConfiguration());
} else if (row.get() == MAXVALUE_VECTOR_MARKER) {
Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
} else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
} else {
ctx.write(row, new VectorWritable(partialVector));
}
}
}
}
2、Mapper:CooccurrencesMapper类,对同一个userid_index下的vector<itemid_index ,pref>进行处理,
收集<item1, item2>对, 输出为( itemid_index, vector<itemid_index, value> )
Java代码
public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
throws IOException, InterruptedException {
Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
Arrays.sort(occurrences, BY_INDEX);
int cooccurrences = 0;
int prunedCooccurrences = 0;
for (int n = 0; n < occurrences.length; n++) {
Vector.Element occurrenceA = occurrences
;
Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
for (int m = n; m < occurrences.length; m++) {
Vector.Element occurrenceB = occurrences[m];
if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
cooccurrences++;
} else {
prunedCooccurrences++;
}
}
ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
}
ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
}
}
Reducer :SimilarityReducer类,生成协同矩阵
Java代码
public static class SimilarityReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
throws IOException, InterruptedException {
Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
//取一个vecotr作为该item的行向量
Vector dots = partialDotsIterator.next().get();
while (partialDotsIterator.hasNext()) {
Vector toAdd = partialDotsIterator.next().get();
Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
//nonZeroElement.index()为itemid,将另一个vecotr中itemid的value加进去
dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
}
}
//最后得到的dots是协同矩阵中行号为row的一行,行中元素是item对其他的item的相似度
Vector similarities = dots.like();
double normA = norms.getQuick(row.get());
Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
while (dotsWith.hasNext()) {
Vector.Element b = dotsWith.next();
double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
if (similarityValue >= treshold) {
similarities.set(b.index(), similarityValue);
}
}
if (excludeSelfSimilarity) {
similarities.setQuick(row.get(), 0);
}
ctx.write(row, new VectorWritable(similarities));
}
}
3、Mapper:UnsymmetrifyMapper类,用来生成对称矩阵的。上一步得到的是非对称矩阵,首先将矩阵偏转,得到偏转矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
Java代码
public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
throws IOException, InterruptedException {
Vector similarities = similaritiesWritable.get();
// For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector
Vector transposedPartial = similarities.like();
TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
//这个地方用来生成偏转矩阵的,非对称矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
transposedPartial.setQuick(row.get(), nonZeroElement.get());
//偏转矩阵中的每一个元素
ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
transposedPartial.setQuick(row.get(), 0.0);
}
Vector topKSimilarities = similarities.like();
for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
}
//这里只收集前maxSimilaritiesPerRow个得分最高的item,所以咱们最后的对称矩阵,实际上每行只有
//maxSimilaritiesPerRow个是对称的,其他的位置也不管了
ctx.write(row, new VectorWritable(topKSimilarities));
}
}
Reducer:MergeToTopKSimilaritiesReducer类,就是将上面Map偏转的元素都收集起来,也就是完成了偏转矩阵和(截取了得分前maxSimilaritiesPerRow个)的原矩阵相加的过程,得到了对称矩阵
Java代码
public static class MergeToTopKSimilaritiesReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
throws IOException, InterruptedException {
Vector allSimilarities = Vectors.merge(partials);
Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
ctx.write(row, new VectorWritable(topKSimilarities));
}
}
至此,RowSimilarityJob类的全部工作就完成,最终生成的是一个对称矩阵,也就是协同矩阵
Java代码
//协同矩阵与用户向量相乘
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
//第一个MapReducer
Job prePartialMultiply1 = prepareJob(
similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = prePartialMultiply1.waitForCompletion(true);
if (!succeeded)
return -1;
//第二个MapReduce
//continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
maxPrefsPerUser);
succeeded = prePartialMultiply2.waitForCompletion(true);
if (!succeeded)
return -1;
//finish the job
//第三个MapReduce
Job partialMultiply = prepareJob(
new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded)
return -1;
}
下边也是同样分析一下这个三个MapReduce的细节:
1、Mapper: SimilarityMatrixRowWrapperMapper 类,将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper 的输出类型一致
Java代码
public final class SimilarityMatrixRowWrapperMapper extends
Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
//将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper
//的输出类型一致
@Override
protected void map(IntWritable key,
VectorWritable value,
Context context) throws IOException, InterruptedException {
Vector similarityMatrixRow = value.get();
/* remove self similarity */
similarityMatrixRow.set(key.get(), Double.NaN);
context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
}
}
2、Mapper:UserVectorSplitterMapper类
Java代码
//输入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>
//输出格式: itemid1:<theUserID,pref1>
// itemid2:<theUserID,pref2>
// itemid3:<theUserID,pref3>
// ......
// itemidN:<theUserID,prefN>
Java代码
public final class UserVectorSplitterMapper extends
Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
@Override
protected void map(VarLongWritable key,
VectorWritable value,
Context context) throws IOException, InterruptedException {
long userID = key.get();
if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
return;
}
Vector userVector = maybePruneUserVector(value.get());
Iterator<Vector.Element> it = userVector.iterateNonZero();
VarIntWritable itemIndexWritable = new VarIntWritable();
VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
itemIndexWritable.set(e.index());
vectorOrPref.set(userID, (float) e.get());
context.write(itemIndexWritable, vectorOrPref);
}
}
3、Reduce:ToVectorAndPrefReducer类,收集协同矩阵为itemid的一行,并且收集评价过该item的用户和评分,最后的输出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
Java代码
public final class ToVectorAndPrefReducer extends
Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
//收集所有key为itemid的
@Override
protected void reduce(VarIntWritable key,
Iterable<VectorOrPrefWritable> values,
Context context) throws IOException, InterruptedException {
List<Long> userIDs = Lists.newArrayList();
List<Float> prefValues = Lists.newArrayList();
Vector similarityMatrixColumn = null;
for (VectorOrPrefWritable value : values) {
if (value.getVector() == null) {
// Then this is a user-pref value
userIDs.add(value.getUserID());
prefValues.add(value.getValue());
} else {
// Then this is the column vector
//协同矩阵的一个行(行号为itemid的一行)
if (similarityMatrixColumn != null) {
throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
}
similarityMatrixColumn = value.getVector();
}
}
if (similarityMatrixColumn == null) {
return;
}
//收集协同矩阵为itemid的一行,并且手机评价过该item的用户和评分
VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
context.write(key, vectorAndPrefs);
}
}
第四步,协同矩阵和用户向量相乘,得到推荐结果
Java代码
//extract out the recommendations
Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
Mapper:PartialMultiplyMapper类
Java代码
//输入类型:( itemid_index, <userid的数组,pref的数组,协同矩阵行号为itemid_index的行> )
//输出类型: userid,<该用户对itemid_index1的评分,协同矩阵行号为itemid_index1的行> )
// userid,<该用户对itemid_index2的评分,协同矩阵行号为itemid_index2的行> )
// .....
// .....
// userid,<该用户对itemid_indexN的评分,协同矩阵行号为itemid_indexN的行> )
Java代码
public final class PartialMultiplyMapper extends
Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {
@Override
protected void map(VarIntWritable key,
VectorAndPrefsWritable vectorAndPrefsWritable,
Context context) throws IOException, InterruptedException {
Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
List<Float> prefValues = vectorAndPrefsWritable.getValues();
VarLongWritable userIDWritable = new VarLongWritable();
PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
for (int i = 0; i < userIDs.size(); i++) {
long userID = userIDs.get(i);
float prefValue = prefValues.get(i);
if (!Float.isNaN(prefValue)) {
prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
userIDWritable.set(userID);
context.write(userIDWritable, prefAndSimilarityColumn);
}
}
}
}
Reducer:AggregateAndRecommendReducer类,Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。
而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。
用这个数据排序就可得到推荐结果
Java代码
public final class AggregateAndRecommendReducer extends
Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
@Override
protected void reduce(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
if (booleanData) {
reduceBooleanData(userID, values, context);
} else {
reduceNonBooleanData(userID, values, context);
}
}
private void reduceBooleanData(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
/* having boolean data, each estimated preference can only be 1,
* however we can't use this to rank the recommended items,
* so we use the sum of similarities for that. */
Vector predictionVector = null;
for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
predictionVector = predictionVector == null
? prefAndSimilarityColumn.getSimilarityColumn()
: predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
}
writeRecommendedItems(userID, predictionVector, context);
}
private void reduceNonBooleanData(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
/* each entry here is the sum in the numerator of the prediction formula */
Vector numerators = null;
/* each entry here is the sum in the denominator of the prediction formula */
Vector denominators = null;
/* each entry here is the number of similar items used in the prediction formula */
Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
float prefValue = prefAndSimilarityColumn.getPrefValue();
/* count the number of items used for each prediction */
Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
while (usedItemsIterator.hasNext()) {
int itemIDIndex = usedItemsIterator.next().index();
numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
}
//vector.times(float) 是向量乘于一个数,也就是向量的每一个值都乘以这个数
//vector.plus(vector) 是两个向量相加,每一个位置上的值相加
//numerators是一个vecotr,每一个元素是这样的
/*
例如index为item1的元素的值为:
simility(item1, item_2)*pref(userid, item_2)
+ simility(item_1, item_3)*pref(userid, item_3)
+ simility(item1, item_4)*pref(userid, item_4)
+ ……
+ simility(item_1, item_2)*pref(userid, item_N)
*/
// 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid对item打分分值
numerators = numerators == null
? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
: numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));
simColumn.assign(ABSOLUTE_VALUES);
//denominators是一个vecotr,每一个元素是这样的
/*
例如index为item1的元素的值为:
simility(item1, item_2)+ simility(item_1, item_3)+ …… + simility(item_1, item_2)*pref(userid, item_N)
*/
// 注:其中simility(item1, item2)代表物品item1和物品item2的相似度
denominators = denominators == null ? simColumn : denominators.plus(simColumn);
}
if (numerators == null) {
return;
}
Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
Iterator<Vector.Element> iterator = numerators.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element element = iterator.next();
int itemIDIndex = element.index();
/* preference estimations must be based on at least 2 datapoints */
if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
/* compute normalized prediction */
//计算归一化预测值
double prediction = element.get() / denominators.getQuick(itemIDIndex);
recommendationVector.setQuick(itemIDIndex, prediction);
}
}
writeRecommendedItems(userID, recommendationVector, context);
}
}
基于hadoop的推荐算法,讲其中mahout实现的基于项目的推荐算法
分为4步:
1.获得人-物 用户矩阵
输入为所有人对物品的评价或关联
map端输出key为人,value为物品+倾好度
reeduce端输出key为人,vallue为多个物品+倾好度
2.获得物-物 项目矩阵
输入为“用户矩阵”,讲每一行人-物数据中的物品做笛卡尔积,生产成物-物的关联
map端输出为key为物,value为关联度
reduce端输出key为物,value为多个物的关联度
(可以根据各种规则生成项目相似度矩阵表,此处算法带过)
修改:
求项目相似矩阵是基于项目的协同过滤算法的核心
公式有很多种,核心是物品i和物品j相关用户的交集与并集的商
mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))
2.norms(i) = sum(Pi(u)^2)
3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)
mahout的实现方法是
第一个job,用物品-人的矩阵,求得norms,即物品的用户平方和,输出是物-norms
第二个job,Map:用人-物的矩阵,求Pi(u)*Pi(u),即相同用户的物品的评价的乘机,输出物-多个对端物品的Pi(u)*Pi(u)
Reduce:用物-多个对端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩阵(因为这个时候可以汇总所有和这个物品相关的物品的dot)
第三个job,补全物品的相似矩阵
3.获得用户-项目相似矩阵
输入为人-物 用户矩阵 和 物-物 项目矩阵
Map端输出key为物,value为类VectorOrPrefWritable,是包含物与人的倾好度,或是物与物的相似度
reduce端输出key为物,value为类VectorAndPrefWritable,是汇总当个物品到所有人的倾好度和到所有物品的相似度
4.获得用户推荐矩阵
输入为VectorAndPrefWritable
Map端输出为key:人,value:物+系数(map端根据单个物品贡献的系数生成推荐系数,也就是人到物品A的倾好度*物品A到其他物品的相似度)
reduce端输出为key:人,,value:推荐项目+系数(reduce端使用自定公式,汇总所有单物品贡献的四叔,求人到其他项目的倾好度,取topn作为当前用户的推荐项目)
再在这里贴几个mahout推荐算法分析的帖子:
http://eric-gcm.iteye.com/blog/1817822 http://eric-gcm.iteye.com/blog/1818033 http://eric-gcm.iteye.com/blog/1820060
以下是mahout代码:
ItemSimilarityJob类是mahout使用hadoop做推荐引擎的主要实现类,下面开始分析。
run()函数是启动函数:
Java代码
public final class RecommenderJob extends AbstractJob {
public static final String BOOLEAN_DATA = "booleanData";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@Override
public int run(String[] args) throws Exception {
//这里原来有大一堆代码,都是用来载入配置项,不用管它
//第一步:准备矩阵,将原始数据转换为一个矩阵,在PreparePreferenceMatrixJob这个类中完成
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
"--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString()});
numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
//第二步:计算协同矩阵
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
/* special behavior if phase 1 is skipped */
if (numberOfUsers == -1) {
numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
PathType.LIST, null, getConf());
}
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
//calculate the co-occurrence matrix
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job prePartialMultiply1 = prepareJob(
similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = prePartialMultiply1.waitForCompletion(true);
if (!succeeded)
return -1;
//continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
maxPrefsPerUser);
succeeded = prePartialMultiply2.waitForCompletion(true);
if (!succeeded)
return -1;
//finish the job
Job partialMultiply = prepareJob(
new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded)
return -1;
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
//filter out any users we don't care about
/* convert the user/item pairs to filter if a filterfile has been specified */
if (filterFile != null) {
Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = itemFiltering.waitForCompletion(true);
if (!succeeded)
return -1;
}
String aggregateAndRecommendInput = partialMultiplyPath.toString();
if (filterFile != null) {
aggregateAndRecommendInput += "," + explicitFilterPath;
}
//extract out the recommendations
Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
}
if (filterFile != null) {
setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
}
setIOSort(aggregateAndRecommend);
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
if (!succeeded)
return -1;
}
return 0;
}
第二步,计算协同矩阵,主要在RowSimilarityJob 这个类中完成
Java代码
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--tempDir", getTempPath().toString()});
}
可以看到这个job的输入路径就是上一篇中,PreparePreferenceMatrixJob中最后一个reducer的输出路径。
下边详细分析RowSimilarityJob类的实现:
Java代码
public class RowSimilarityJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
//一大堆载入参数的代码,忽略
//第一个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第二个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
pairwiseConf.set(NORMS_PATH, normsPath.toString());
pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
//第三个MapReduce
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
VectorWritable.class);
asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
boolean succeeded = asMatrix.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
return 0;
}
可以看到RowSimilityJob也是分成三个MapReduce过程:
1、Mapper :VectorNormMapper类,输出 ( userid_index, <itemid_index, pref> )类型
Java代码
public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector rowVector = similarity.normalize(vectorWritable.get());
int numNonZeroEntries = 0;
double maxValue = Double.MIN_VALUE;
Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element element = nonZeroElements.next();
RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
partialColumnVector.setQuick(row.get(), element.get());
//输出 ( userid_index, <itemid_index, pref> )类型
ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
numNonZeroEntries++;
if (maxValue < element.get()) {
maxValue = element.get();
}
}
if (threshold != NO_THRESHOLD) {
nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
maxValues.setQuick(row.get(), maxValue);
}
norms.setQuick(row.get(), similarity.norm(rowVector));
//计算item的总数
ctx.getCounter(Counters.ROWS).increment(1);
}
}
Reduer : MergeVectorsReducer类,输入的是(userid_index, <itemid_index, pref>),同一个userid_index在此进行合并,输出( userid_index, vector<itemid_index, pref> )
Java代码
public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
throws IOException, InterruptedException {
Vector partialVector = Vectors.merge(partialVectors);
if (row.get() == NORM_VECTOR_MARKER) {
Vectors.write(partialVector, normsPath, ctx.getConfiguration());
} else if (row.get() == MAXVALUE_VECTOR_MARKER) {
Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
} else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
} else {
ctx.write(row, new VectorWritable(partialVector));
}
}
}
}
2、Mapper:CooccurrencesMapper类,对同一个userid_index下的vector<itemid_index ,pref>进行处理,
收集<item1, item2>对, 输出为( itemid_index, vector<itemid_index, value> )
Java代码
public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
throws IOException, InterruptedException {
Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
Arrays.sort(occurrences, BY_INDEX);
int cooccurrences = 0;
int prunedCooccurrences = 0;
for (int n = 0; n < occurrences.length; n++) {
Vector.Element occurrenceA = occurrences
;
Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
for (int m = n; m < occurrences.length; m++) {
Vector.Element occurrenceB = occurrences[m];
if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
cooccurrences++;
} else {
prunedCooccurrences++;
}
}
ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
}
ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
}
}
Reducer :SimilarityReducer类,生成协同矩阵
Java代码
public static class SimilarityReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
throws IOException, InterruptedException {
Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
//取一个vecotr作为该item的行向量
Vector dots = partialDotsIterator.next().get();
while (partialDotsIterator.hasNext()) {
Vector toAdd = partialDotsIterator.next().get();
Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
//nonZeroElement.index()为itemid,将另一个vecotr中itemid的value加进去
dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
}
}
//最后得到的dots是协同矩阵中行号为row的一行,行中元素是item对其他的item的相似度
Vector similarities = dots.like();
double normA = norms.getQuick(row.get());
Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
while (dotsWith.hasNext()) {
Vector.Element b = dotsWith.next();
double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
if (similarityValue >= treshold) {
similarities.set(b.index(), similarityValue);
}
}
if (excludeSelfSimilarity) {
similarities.setQuick(row.get(), 0);
}
ctx.write(row, new VectorWritable(similarities));
}
}
3、Mapper:UnsymmetrifyMapper类,用来生成对称矩阵的。上一步得到的是非对称矩阵,首先将矩阵偏转,得到偏转矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
Java代码
public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
throws IOException, InterruptedException {
Vector similarities = similaritiesWritable.get();
// For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector
Vector transposedPartial = similarities.like();
TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
//这个地方用来生成偏转矩阵的,非对称矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
transposedPartial.setQuick(row.get(), nonZeroElement.get());
//偏转矩阵中的每一个元素
ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
transposedPartial.setQuick(row.get(), 0.0);
}
Vector topKSimilarities = similarities.like();
for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
}
//这里只收集前maxSimilaritiesPerRow个得分最高的item,所以咱们最后的对称矩阵,实际上每行只有
//maxSimilaritiesPerRow个是对称的,其他的位置也不管了
ctx.write(row, new VectorWritable(topKSimilarities));
}
}
Reducer:MergeToTopKSimilaritiesReducer类,就是将上面Map偏转的元素都收集起来,也就是完成了偏转矩阵和(截取了得分前maxSimilaritiesPerRow个)的原矩阵相加的过程,得到了对称矩阵
Java代码
public static class MergeToTopKSimilaritiesReducer
extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private int maxSimilaritiesPerRow;
@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
}
@Override
protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
throws IOException, InterruptedException {
Vector allSimilarities = Vectors.merge(partials);
Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
ctx.write(row, new VectorWritable(topKSimilarities));
}
}
至此,RowSimilarityJob类的全部工作就完成,最终生成的是一个对称矩阵,也就是协同矩阵
Java代码
//协同矩阵与用户向量相乘
//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
//第一个MapReducer
Job prePartialMultiply1 = prepareJob(
similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = prePartialMultiply1.waitForCompletion(true);
if (!succeeded)
return -1;
//第二个MapReduce
//continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
maxPrefsPerUser);
succeeded = prePartialMultiply2.waitForCompletion(true);
if (!succeeded)
return -1;
//finish the job
//第三个MapReduce
Job partialMultiply = prepareJob(
new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded)
return -1;
}
下边也是同样分析一下这个三个MapReduce的细节:
1、Mapper: SimilarityMatrixRowWrapperMapper 类,将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper 的输出类型一致
Java代码
public final class SimilarityMatrixRowWrapperMapper extends
Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
//将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper
//的输出类型一致
@Override
protected void map(IntWritable key,
VectorWritable value,
Context context) throws IOException, InterruptedException {
Vector similarityMatrixRow = value.get();
/* remove self similarity */
similarityMatrixRow.set(key.get(), Double.NaN);
context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
}
}
2、Mapper:UserVectorSplitterMapper类
Java代码
//输入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>
//输出格式: itemid1:<theUserID,pref1>
// itemid2:<theUserID,pref2>
// itemid3:<theUserID,pref3>
// ......
// itemidN:<theUserID,prefN>
Java代码
public final class UserVectorSplitterMapper extends
Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
@Override
protected void map(VarLongWritable key,
VectorWritable value,
Context context) throws IOException, InterruptedException {
long userID = key.get();
if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
return;
}
Vector userVector = maybePruneUserVector(value.get());
Iterator<Vector.Element> it = userVector.iterateNonZero();
VarIntWritable itemIndexWritable = new VarIntWritable();
VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
itemIndexWritable.set(e.index());
vectorOrPref.set(userID, (float) e.get());
context.write(itemIndexWritable, vectorOrPref);
}
}
3、Reduce:ToVectorAndPrefReducer类,收集协同矩阵为itemid的一行,并且收集评价过该item的用户和评分,最后的输出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
Java代码
public final class ToVectorAndPrefReducer extends
Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
//收集所有key为itemid的
@Override
protected void reduce(VarIntWritable key,
Iterable<VectorOrPrefWritable> values,
Context context) throws IOException, InterruptedException {
List<Long> userIDs = Lists.newArrayList();
List<Float> prefValues = Lists.newArrayList();
Vector similarityMatrixColumn = null;
for (VectorOrPrefWritable value : values) {
if (value.getVector() == null) {
// Then this is a user-pref value
userIDs.add(value.getUserID());
prefValues.add(value.getValue());
} else {
// Then this is the column vector
//协同矩阵的一个行(行号为itemid的一行)
if (similarityMatrixColumn != null) {
throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
}
similarityMatrixColumn = value.getVector();
}
}
if (similarityMatrixColumn == null) {
return;
}
//收集协同矩阵为itemid的一行,并且手机评价过该item的用户和评分
VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
context.write(key, vectorAndPrefs);
}
}
第四步,协同矩阵和用户向量相乘,得到推荐结果
Java代码
//extract out the recommendations
Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
TextOutputFormat.class);
Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
Mapper:PartialMultiplyMapper类
Java代码
//输入类型:( itemid_index, <userid的数组,pref的数组,协同矩阵行号为itemid_index的行> )
//输出类型: userid,<该用户对itemid_index1的评分,协同矩阵行号为itemid_index1的行> )
// userid,<该用户对itemid_index2的评分,协同矩阵行号为itemid_index2的行> )
// .....
// .....
// userid,<该用户对itemid_indexN的评分,协同矩阵行号为itemid_indexN的行> )
Java代码
public final class PartialMultiplyMapper extends
Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {
@Override
protected void map(VarIntWritable key,
VectorAndPrefsWritable vectorAndPrefsWritable,
Context context) throws IOException, InterruptedException {
Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
List<Float> prefValues = vectorAndPrefsWritable.getValues();
VarLongWritable userIDWritable = new VarLongWritable();
PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
for (int i = 0; i < userIDs.size(); i++) {
long userID = userIDs.get(i);
float prefValue = prefValues.get(i);
if (!Float.isNaN(prefValue)) {
prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
userIDWritable.set(userID);
context.write(userIDWritable, prefAndSimilarityColumn);
}
}
}
}
Reducer:AggregateAndRecommendReducer类,Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。
而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。
用这个数据排序就可得到推荐结果
Java代码
public final class AggregateAndRecommendReducer extends
Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
@Override
protected void reduce(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
if (booleanData) {
reduceBooleanData(userID, values, context);
} else {
reduceNonBooleanData(userID, values, context);
}
}
private void reduceBooleanData(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
/* having boolean data, each estimated preference can only be 1,
* however we can't use this to rank the recommended items,
* so we use the sum of similarities for that. */
Vector predictionVector = null;
for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
predictionVector = predictionVector == null
? prefAndSimilarityColumn.getSimilarityColumn()
: predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
}
writeRecommendedItems(userID, predictionVector, context);
}
private void reduceNonBooleanData(VarLongWritable userID,
Iterable<PrefAndSimilarityColumnWritable> values,
Context context) throws IOException, InterruptedException {
/* each entry here is the sum in the numerator of the prediction formula */
Vector numerators = null;
/* each entry here is the sum in the denominator of the prediction formula */
Vector denominators = null;
/* each entry here is the number of similar items used in the prediction formula */
Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
float prefValue = prefAndSimilarityColumn.getPrefValue();
/* count the number of items used for each prediction */
Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
while (usedItemsIterator.hasNext()) {
int itemIDIndex = usedItemsIterator.next().index();
numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
}
//vector.times(float) 是向量乘于一个数,也就是向量的每一个值都乘以这个数
//vector.plus(vector) 是两个向量相加,每一个位置上的值相加
//numerators是一个vecotr,每一个元素是这样的
/*
例如index为item1的元素的值为:
simility(item1, item_2)*pref(userid, item_2)
+ simility(item_1, item_3)*pref(userid, item_3)
+ simility(item1, item_4)*pref(userid, item_4)
+ ……
+ simility(item_1, item_2)*pref(userid, item_N)
*/
// 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid对item打分分值
numerators = numerators == null
? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
: numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));
simColumn.assign(ABSOLUTE_VALUES);
//denominators是一个vecotr,每一个元素是这样的
/*
例如index为item1的元素的值为:
simility(item1, item_2)+ simility(item_1, item_3)+ …… + simility(item_1, item_2)*pref(userid, item_N)
*/
// 注:其中simility(item1, item2)代表物品item1和物品item2的相似度
denominators = denominators == null ? simColumn : denominators.plus(simColumn);
}
if (numerators == null) {
return;
}
Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
Iterator<Vector.Element> iterator = numerators.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element element = iterator.next();
int itemIDIndex = element.index();
/* preference estimations must be based on at least 2 datapoints */
if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
/* compute normalized prediction */
//计算归一化预测值
double prediction = element.get() / denominators.getQuick(itemIDIndex);
recommendationVector.setQuick(itemIDIndex, prediction);
}
}
writeRecommendedItems(userID, recommendationVector, context);
}
}
相关文章推荐
- Hadoop
- CentOS 安装java环境
- hadoop “util.NativeCodeLoader: Unable to load native-hadoop library for your platform”
- win7+tomcat+jenkins 环境部署
- Linux下解压zip乱码问题的解决(unzip)
- CentOS下搭建DNS服务器-2
- CentOS下搭建DNS服务器-1
- Linux命令行修改IP、网关、DNS的方法
- 2015年移动设备界面设计趋势
- linux 下安装字体
- linux下socket编程实例
- Actor Platform 平台搭建(一) -平台介绍-Docker搭建方法
- 依赖注入及AOP简述(三)——依赖注入的原理
- 15种最佳方式帮你顺利掌握Hadoop技术
- Linux-LVS+keepalived-Testing
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 通过OpenSSL获取证书扩展属性之二:“密钥用法”和"增强型密钥用法"
- Linux:闪光的宝石,智慧(下一个)
- Linux下配置GitHub
- 依赖注入及AOP简述(一)——“依赖”的概念