您的位置:首页 > 运维架构

hadoop item based collaborative filtering use case

2014-06-24 11:37 295 查看
package org.mymahout.recommendation.hadoop;

import java.io.File;

import java.io.IOException;

import java.util.Arrays;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.filecache.DistributedCache;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;

import org.apache.mahout.math.Vector;

public class ItemCFHadoop1 {

private static final String HDFS = "hdfs://*********:9000";//hadoop hdfs 地址

public static void main(String[] args) throws Exception {

String localFile = "datafile/item.csv";

String inPath = HDFS + "/user/hdfs/userCF";

String inFile = inPath + "/item.csv";

String outPath = HDFS + "/user/hdfs/userCF/result/" + System.currentTimeMillis();

String outFile = outPath + "/part-r-00000";

String tmpPath = HDFS + "/tmp/rec001/" + System.currentTimeMillis();

Configuration conf = config();

HdfsUtils hdfs = new HdfsUtils(HDFS, conf);

hdfs.rmr(inPath);

hdfs.mkdirs(inPath);

hdfs.copyFile(localFile, inPath);

hdfs.ls(inPath);

hdfs.cat(inFile);

StringBuilder sb = new StringBuilder();

sb.append("--input ").append(inPath);//输入文件的路径

sb.append(" --output ").append(outPath); //输出文件的路径

sb.append(" --booleanData true");

sb.append(" --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.EuclideanDistanceSimilarity");//欧几里德相似度算法

sb.append(" --tempDir ").append(tmpPath);

sb.append(" --outputPathForSimilarityMatrix ").append(outPath); //是否要有item-item的similarity

args = sb.toString().split(" ");

// Add 3rd-party libraries

String[] mahoutJars = {

"/home/chenhuimin002/workspace/mahout-lib/mahout-math-1.0-SNAPSHOT.jar",

"/home/chenhuimin002/workspace/mahout-lib/mahout-integration-1.0-SNAPSHOT.jar",

"/home/chenhuimin002/workspace/mahout-lib/mahout-mrlegacy-1.0-SNAPSHOT.jar",

"/home/chenhuimin002/workspace/mahout-lib/mahout-mrlegacy-1.0-SNAPSHOT-job.jar" };

addJarToDistributedCache(Arrays.asList(mahoutJars), conf);

// addJarToDistributedCache(MySecondClass.class, conf);

RecommenderJob job = new RecommenderJob();

job.setConf(conf);

job.run(args);

hdfs.cat(outFile);

}

public static Configuration config() {

Configuration conf = new YarnConfiguration();

conf.set("fs.defaultFS", "hdfs://c0004649.itcs.hp.com:9000");

conf.set("mapreduce.framework.name", "yarn");

conf.set("yarn.resourcemanager.scheduler.address","c0004650.itcs.hp.com:8030");

conf.set("yarn.resourcemanager.address", "c0004650.itcs.hp.com:8032");

return conf;

}

private static void addJarToDistributedCache(Class classToAdd,

Configuration conf) throws IOException {

// Retrieve jar file for class2Add

String jar = classToAdd.getProtectionDomain().getCodeSource()

.getLocation().getPath();

System.out.println("jar=" + jar);

File jarFile = new File(jar);

// Declare new HDFS location

Path hdfsJar = new Path("/user/hadoop/lib/mahout/" + jarFile.getName());

// Mount HDFS

FileSystem hdfs = FileSystem.get(conf);

// Copy (override) jar file to HDFS

hdfs.copyFromLocalFile(false, true, new Path(jar), hdfsJar);

// Add jar to distributed classPath

DistributedCache.addFileToClassPath(hdfsJar, conf);

}

private static void addJarToDistributedCache(List<String> jarPaths,Configuration conf) throws IOException {

// Mount HDFS

FileSystem hdfs = FileSystem.get(conf);

for (String jar : jarPaths) {

File jarFile = new File(jar);

// Declare new HDFS location

Path hdfsJar = new Path("/user/hadoop/lib/mahout/"

+ jarFile.getName());

// Copy (override) jar file to HDFS

if (!hdfs.exists(hdfsJar)) {

hdfs.copyFromLocalFile(false, true, new Path(jar), hdfsJar);

}

// Add jar to distributed classPath

DistributedCache.addFileToClassPath(hdfsJar, conf);

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐