PeopleRank Hadoop MapReduce
2016-05-15 08:07
1056 查看
计算影响力大小
package org.bigdata.peoplerank; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.bigdata.util.HadoopCfg; import org.bigdata.util.HadoopUtil; import org.bigdata.util.IndexMapReduce.FileNameInputFormat; /** * PageRank * * @author wwhhf * */ public class PeopleRankMapReduce { private final static String JOB_NAME = "PeopleRank"; private static String LINKS = "part-r-00000"; private static Map<String, Double> rand = new HashMap<String, Double>(); private static final double a = 0.85; public static void initRand(String pathin, String filename) throws IOException { List<String> lines = HadoopUtil.lslFile(pathin, filename); for (String line : lines) { String terms[] = line.toString().split("\t"); rand.put(terms[0], Double.valueOf(terms[1])); } } private static class PeopleRankMapper extends Mapper<Text, Text, Text, DoubleWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String filename = key.toString(); if (filename.startsWith(LINKS)) { String dests[] = value.toString() .replaceFirst(":\\d+\\s+", ",").split(","); if (dests.length == 1) { context.write(new Text(dests[0]), new DoubleWritable(0.0)); } else { if (rand.containsKey(dests[0])) { double e = rand.get(dests[0]); for (int i = 0, len = dests.length; i < len; i++) { String dest = dests[i]; if (i == 0) { context.write(new Text(dest), new DoubleWritable(0.0)); } else { context.write(new Text(dest), new DoubleWritable(e / (len - 1))); } } } } } } } private static class PeopleRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { Double sum = 0.0; for (DoubleWritable value : values) { sum += value.get(); } if (rand.containsKey(key.toString())) { double e = rand.get(key.toString()); context.write(key, new DoubleWritable(a * sum + (1 - a) * e)); } } } public static void solve(String linksin, String pathin, String pathout) throws ClassNotFoundException, InterruptedException { try { Configuration cfg = HadoopCfg.getConfiguration(); Job job = Job.getInstance(cfg); job.setJobName(JOB_NAME); job.setJarByClass(PeopleRankMapReduce.class); job.setInputFormatClass(FileNameInputFormat.class); // mapper job.setMapperClass(PeopleRankMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); // reducer job.setReducerClass(PeopleRankReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(pathin)); FileInputFormat.addInputPath(job, new Path(linksin)); FileOutputFormat.setOutputPath(job, new Path(pathout)); job.waitForCompletion(true); } catch (IllegalStateException | IllegalArgumentException | IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws ClassNotFoundException, InterruptedException, IOException { String path = "/weibo_out1"; String links_pathin = "/weibo_out"; String filename = "part-r-00000"; String tmp_pathin = path; for (int i = 1; i <= 5; i++) { initRand(tmp_pathin, filename); String tmp_pathout = path + i; System.out.println(links_pathin + " " + tmp_pathin + " " + tmp_pathout); solve(links_pathin, tmp_pathin, tmp_pathout); tmp_pathin = tmp_pathout; } } }
相关文章推荐
- 系统架构师-基础到企业应用架构-系列索引
- 如何成为优秀的架构师
- LINUX C++ 技术博客
- linux修改swap大小
- Java知识:(3)Tomcat
- Linux下的三个时间属性
- CentOS 6安裝VNC、Xfce桌面
- tomcat环境下服务器文件句柄耗尽(Too Many Open Files)的问题排查
- Tomcat-connector的微调(1): acceptCount参数(socket的backlog)(重要)
- opencv的使用例子
- NGINX userid 分析、解码
- 使用nginx的proxy_cache做网站缓存
- Linux内核态的文件操作
- docker命令总结
- 第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详
- Spark-再次分析Apache访问日志
- 【OpenCV入门教程之一】 安装OpenCV:OpenCV 3.0、OpenCV 2.4.8、OpenCV 2.4.9 +VS 开发环境配置
- linux list
- codeforces_665B. Shopping
- 《Linux驱动》中断