您的位置:首页 > 其它

基于MapReduce框架的PageRank算法实战(下)

2016-05-17 00:00 281 查看
摘要: 在上一篇博客的基础上,这篇将与大家分享利用MapReduce框架迭代计算每个关注者的支持度概率分布,最后并对结果进行排序。

基于上篇获得的两个数据文件links.txt和rand.txt,采用类似实现最简单的PageRank模型的方法计算下一次的概率分布。

代码实现:

public class PeopleRank {

private static double pValue = 0.8;

private static Map<String, Double> table = new HashMap<String, Double>();

private static Configuration cfg = HadoopCfg.getConfigration();

private static class PeopleRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {

@Override

protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)

throws IOException, InterruptedException {

super.setup(context);

Configuration cfg = HadoopCfg.getConfigration();

FileSystem fs = FileSystem.get(cfg);

Path path = new Path(cfg.get("rand"));

RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);

while(rt.hasNext()){

LocatedFileStatus status = rt.next();

Path filePath = status.getPath();

BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));

String line = "";

while ((line = br.readLine()) != null) {

System.out.println(line);

//注意分割,本地文件为空格,而hdfs中文件为\t

String[] strs = line.trim().split("\t"); //strs[0]为关注者,strs[1]为支持度概率

table.put(strs[0], Double.parseDouble(strs[1]));

}

}

}

@Override

protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {

// 可以获取文件名,根据文件名来判定传入reducer的形式

String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

if (pathName.toString().equals("links.txt")) {

//链接文件是用空格分割的,要注意区分

String[] strs = value.toString().split("\t");

context.write(new Text(strs[0]), new DoubleWritable(0.0));

double temp = table.get(strs[0]);

String[] values = strs[1].split(",");

for(int i=0; i<values.length; i++){

context.write(new Text(values[i]), new DoubleWritable(temp/(values.length)));

}

}

}

}

private static class PeopleRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

@Override

protected void reduce(Text value, Iterable<DoubleWritable> datas,

Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)

throws IOException, InterruptedException {

double total = 0.0;

for (DoubleWritable data : datas) {

total += data.get();

}

double temp = 0.0;

if(table.get(value.toString())!=null){

temp = table.get(value.toString());

}

double result = pValue*total+(1-pValue)*temp;

context.write(value, new DoubleWritable(result));

}

}

private static class SortMapper extends Mapper<LongWritable, Text, DoubleWritable, Text> {

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {

String[] strs = value.toString().split("\t");

context.write(new DoubleWritable(Double.parseDouble(strs[1])), new Text(strs[0]));

}

}

private static class SortReducer extends Reducer<DoubleWritable, Text, DoubleWritable, Text> {

@Override

protected void reduce(DoubleWritable value, Iterable<Text> datas, Reducer<DoubleWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {

for(Text data:datas){

context.write(value, data);

}

}

}

public static String firstRun(int count){

String path="/second/sec/secrank";

try {

cfg.set("rand", path+count);

Job job = Job.getInstance(cfg);

job.setJobName("PeopleRank");

job.setJarByClass(PeopleRank.class);

//要引入

job.setInputFormatClass(FileNameInputFormat.class);

job.setMapperClass(PeopleRankMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DoubleWritable.class);

job.setReducerClass(PeopleRankReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path("/UserRelation"));

FileInputFormat.addInputPath(job, new Path(path+count));

FileOutputFormat.setOutputPath(job, new Path(path+(count+1)));

job.waitForCompletion(true);

} catch (Exception e) {

e.printStackTrace();

}

return path+(count+1);

}

public static void sort(String path){

try {

//排序

Job job = Job.getInstance(cfg);//前一个job挂掉了,所以需要重新生成一个job

job.setJobName("Sort");

job.setJarByClass(Fans.class);

job.setMapperClass(SortMapper.class);

job.setMapOutputKeyClass(DoubleWritable.class);

job.setMapOutputValueClass(Text.class);

job.setReducerClass(SortReducer.class);

job.setOutputKeyClass(DoubleWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(path));

FileOutputFormat.setOutputPath(job, new Path("/second/sec/result"));

System.exit(job.waitForCompletion(true) ? 0 : 1);

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args){

String path="";

for(int i=1; i<=10; i++){ //为了节约时间,此处我只迭代了10次

path=firstRun(i);

}

sort(path);

}

}

迭代10次的数据基础上并进行排序,最终得到的部分结果展示:



前面是支持度概率,后面是微博ID,概率值越大,说明其粉丝数越多,关注他的人越多。

前十名分别是:

1191258123 韩寒

2789168643 媒体微博助理

2656274875 央视新闻

1618051664 头条新闻

2803301701 人民日报

1282005885 蔡康永

1195230310 何炅

1496852380 崔永元

1197161814 李开复

1656809190 赵薇

写在最后:这只是一次简单的实践,希望能带给大家不同的感受。学习之后要实践之后才能深刻体会其中的巧妙。若有错误,望指正。下次将与大家分享K均值算法原理的实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: