您的位置:首页 > 运维架构

PeopleRank Hadoop MapReduce

2016-05-15 08:07 1056 查看
计算影响力大小

package org.bigdata.peoplerank;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bigdata.util.HadoopCfg;
import org.bigdata.util.HadoopUtil;
import org.bigdata.util.IndexMapReduce.FileNameInputFormat;

/**
* PageRank
*
* @author wwhhf
*
*/
public class PeopleRankMapReduce {

private final static String JOB_NAME = "PeopleRank";
private static String LINKS = "part-r-00000";

private static Map<String, Double> rand = new HashMap<String, Double>();

private static final double a = 0.85;

public static void initRand(String pathin, String filename)
throws IOException {
List<String> lines = HadoopUtil.lslFile(pathin, filename);
for (String line : lines) {
String terms[] = line.toString().split("\t");
rand.put(terms[0], Double.valueOf(terms[1]));
}
}

private static class PeopleRankMapper extends
Mapper<Text, Text, Text, DoubleWritable> {

@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String filename = key.toString();
if (filename.startsWith(LINKS)) {
String dests[] = value.toString()
.replaceFirst(":\\d+\\s+", ",").split(",");
if (dests.length == 1) {
context.write(new Text(dests[0]), new DoubleWritable(0.0));
} else {
if (rand.containsKey(dests[0])) {
double e = rand.get(dests[0]);
for (int i = 0, len = dests.length; i < len; i++) {
String dest = dests[i];
if (i == 0) {
context.write(new Text(dest),
new DoubleWritable(0.0));
} else {
context.write(new Text(dest),
new DoubleWritable(e / (len - 1)));
}
}
}
}
}
}

}

private static class PeopleRankReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {

@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
Double sum = 0.0;
for (DoubleWritable value : values) {
sum += value.get();
}
if (rand.containsKey(key.toString())) {
double e = rand.get(key.toString());
context.write(key, new DoubleWritable(a * sum + (1 - a) * e));
}
}

}

public static void solve(String linksin, String pathin, String pathout)
throws ClassNotFoundException, InterruptedException {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName(JOB_NAME);
job.setJarByClass(PeopleRankMapReduce.class);
job.setInputFormatClass(FileNameInputFormat.class);

// mapper
job.setMapperClass(PeopleRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);

// reducer
job.setReducerClass(PeopleRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(pathin));
FileInputFormat.addInputPath(job, new Path(linksin));
FileOutputFormat.setOutputPath(job, new Path(pathout));

job.waitForCompletion(true);

} catch (IllegalStateException | IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws ClassNotFoundException,
InterruptedException, IOException {
String path = "/weibo_out1";
String links_pathin = "/weibo_out";
String filename = "part-r-00000";
String tmp_pathin = path;
for (int i = 1; i <= 5; i++) {
initRand(tmp_pathin, filename);
String tmp_pathout = path + i;
System.out.println(links_pathin + " " + tmp_pathin + " "
+ tmp_pathout);
solve(links_pathin, tmp_pathin, tmp_pathout);
tmp_pathin = tmp_pathout;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: