基于MapReduce框架的PageRank算法实战(下)
2016-05-17 00:00
281 查看
摘要: 在上一篇博客的基础上,这篇将与大家分享利用MapReduce框架迭代计算每个关注者的支持度概率分布,最后并对结果进行排序。
基于上篇获得的两个数据文件links.txt和rand.txt,采用类似实现最简单的PageRank模型的方法计算下一次的概率分布。
代码实现:
public class PeopleRank {
private static double pValue = 0.8;
private static Map<String, Double> table = new HashMap<String, Double>();
private static Configuration cfg = HadoopCfg.getConfigration();
private static class PeopleRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {
@Override
protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = HadoopCfg.getConfigration();
FileSystem fs = FileSystem.get(cfg);
Path path = new Path(cfg.get("rand"));
RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);
while(rt.hasNext()){
LocatedFileStatus status = rt.next();
Path filePath = status.getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
String line = "";
while ((line = br.readLine()) != null) {
System.out.println(line);
//注意分割,本地文件为空格,而hdfs中文件为\t
String[] strs = line.trim().split("\t"); //strs[0]为关注者,strs[1]为支持度概率
table.put(strs[0], Double.parseDouble(strs[1]));
}
}
}
@Override
protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 可以获取文件名,根据文件名来判定传入reducer的形式
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
if (pathName.toString().equals("links.txt")) {
//链接文件是用空格分割的,要注意区分
String[] strs = value.toString().split("\t");
context.write(new Text(strs[0]), new DoubleWritable(0.0));
double temp = table.get(strs[0]);
String[] values = strs[1].split(",");
for(int i=0; i<values.length; i++){
context.write(new Text(values[i]), new DoubleWritable(temp/(values.length)));
}
}
}
}
private static class PeopleRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text value, Iterable<DoubleWritable> datas,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double total = 0.0;
for (DoubleWritable data : datas) {
total += data.get();
}
double temp = 0.0;
if(table.get(value.toString())!=null){
temp = table.get(value.toString());
}
double result = pValue*total+(1-pValue)*temp;
context.write(value, new DoubleWritable(result));
}
}
private static class SortMapper extends Mapper<LongWritable, Text, DoubleWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
context.write(new DoubleWritable(Double.parseDouble(strs[1])), new Text(strs[0]));
}
}
private static class SortReducer extends Reducer<DoubleWritable, Text, DoubleWritable, Text> {
@Override
protected void reduce(DoubleWritable value, Iterable<Text> datas, Reducer<DoubleWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {
for(Text data:datas){
context.write(value, data);
}
}
}
public static String firstRun(int count){
String path="/second/sec/secrank";
try {
cfg.set("rand", path+count);
Job job = Job.getInstance(cfg);
job.setJobName("PeopleRank");
job.setJarByClass(PeopleRank.class);
//要引入
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(PeopleRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(PeopleRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("/UserRelation"));
FileInputFormat.addInputPath(job, new Path(path+count));
FileOutputFormat.setOutputPath(job, new Path(path+(count+1)));
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
return path+(count+1);
}
public static void sort(String path){
try {
//排序
Job job = Job.getInstance(cfg);//前一个job挂掉了,所以需要重新生成一个job
job.setJobName("Sort");
job.setJarByClass(Fans.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path));
FileOutputFormat.setOutputPath(job, new Path("/second/sec/result"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
String path="";
for(int i=1; i<=10; i++){ //为了节约时间,此处我只迭代了10次
path=firstRun(i);
}
sort(path);
}
}
迭代10次的数据基础上并进行排序,最终得到的部分结果展示:
前面是支持度概率,后面是微博ID,概率值越大,说明其粉丝数越多,关注他的人越多。
前十名分别是:
1191258123 韩寒
2789168643 媒体微博助理
2656274875 央视新闻
1618051664 头条新闻
2803301701 人民日报
1282005885 蔡康永
1195230310 何炅
1496852380 崔永元
1197161814 李开复
1656809190 赵薇
写在最后:这只是一次简单的实践,希望能带给大家不同的感受。学习之后要实践之后才能深刻体会其中的巧妙。若有错误,望指正。下次将与大家分享K均值算法原理的实现。
基于上篇获得的两个数据文件links.txt和rand.txt,采用类似实现最简单的PageRank模型的方法计算下一次的概率分布。
代码实现:
public class PeopleRank {
private static double pValue = 0.8;
private static Map<String, Double> table = new HashMap<String, Double>();
private static Configuration cfg = HadoopCfg.getConfigration();
private static class PeopleRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {
@Override
protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = HadoopCfg.getConfigration();
FileSystem fs = FileSystem.get(cfg);
Path path = new Path(cfg.get("rand"));
RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);
while(rt.hasNext()){
LocatedFileStatus status = rt.next();
Path filePath = status.getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
String line = "";
while ((line = br.readLine()) != null) {
System.out.println(line);
//注意分割,本地文件为空格,而hdfs中文件为\t
String[] strs = line.trim().split("\t"); //strs[0]为关注者,strs[1]为支持度概率
table.put(strs[0], Double.parseDouble(strs[1]));
}
}
}
@Override
protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
// 可以获取文件名,根据文件名来判定传入reducer的形式
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
if (pathName.toString().equals("links.txt")) {
//链接文件是用空格分割的,要注意区分
String[] strs = value.toString().split("\t");
context.write(new Text(strs[0]), new DoubleWritable(0.0));
double temp = table.get(strs[0]);
String[] values = strs[1].split(",");
for(int i=0; i<values.length; i++){
context.write(new Text(values[i]), new DoubleWritable(temp/(values.length)));
}
}
}
}
private static class PeopleRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text value, Iterable<DoubleWritable> datas,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double total = 0.0;
for (DoubleWritable data : datas) {
total += data.get();
}
double temp = 0.0;
if(table.get(value.toString())!=null){
temp = table.get(value.toString());
}
double result = pValue*total+(1-pValue)*temp;
context.write(value, new DoubleWritable(result));
}
}
private static class SortMapper extends Mapper<LongWritable, Text, DoubleWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
context.write(new DoubleWritable(Double.parseDouble(strs[1])), new Text(strs[0]));
}
}
private static class SortReducer extends Reducer<DoubleWritable, Text, DoubleWritable, Text> {
@Override
protected void reduce(DoubleWritable value, Iterable<Text> datas, Reducer<DoubleWritable, Text, DoubleWritable, Text>.Context context) throws IOException, InterruptedException {
for(Text data:datas){
context.write(value, data);
}
}
}
public static String firstRun(int count){
String path="/second/sec/secrank";
try {
cfg.set("rand", path+count);
Job job = Job.getInstance(cfg);
job.setJobName("PeopleRank");
job.setJarByClass(PeopleRank.class);
//要引入
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(PeopleRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(PeopleRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("/UserRelation"));
FileInputFormat.addInputPath(job, new Path(path+count));
FileOutputFormat.setOutputPath(job, new Path(path+(count+1)));
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
return path+(count+1);
}
public static void sort(String path){
try {
//排序
Job job = Job.getInstance(cfg);//前一个job挂掉了,所以需要重新生成一个job
job.setJobName("Sort");
job.setJarByClass(Fans.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path));
FileOutputFormat.setOutputPath(job, new Path("/second/sec/result"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
String path="";
for(int i=1; i<=10; i++){ //为了节约时间,此处我只迭代了10次
path=firstRun(i);
}
sort(path);
}
}
迭代10次的数据基础上并进行排序,最终得到的部分结果展示:
前面是支持度概率,后面是微博ID,概率值越大,说明其粉丝数越多,关注他的人越多。
前十名分别是:
1191258123 韩寒
2789168643 媒体微博助理
2656274875 央视新闻
1618051664 头条新闻
2803301701 人民日报
1282005885 蔡康永
1195230310 何炅
1496852380 崔永元
1197161814 李开复
1656809190 赵薇
写在最后:这只是一次简单的实践,希望能带给大家不同的感受。学习之后要实践之后才能深刻体会其中的巧妙。若有错误,望指正。下次将与大家分享K均值算法原理的实现。
相关文章推荐
- php微信装逼神器
- php微信楼宇表白
- Android 制作Nine-Patch图片
- 老倪膏药
- 老倪膏药电极贴片
- shell脚本的使用---while循环
- v塑
- rbf神经网络的实现
- v塑官网
- Git使用详解
- 该如何方便的完成NiceLabel 6的安装
- 增强云主机安全性的五大方法
- CODESOFT 2015中的二维码该怎样生成
- 学习新东西的唯一方法
- 代码审查清单可消除更多的bug
- Effective C++: volatile
- spring MVC ajax总是error
- UVA 10082 WERTYU
- 【小平工作日志】Hadoop环境与eclipse集成hadoop-eclipse-plugin
- mybatis与spring结合