您的位置:首页 > 产品设计 > UI/UE

Hadoop构建MapRedcue程序的基础模板

2014-08-20 16:34 309 查看
好记性不如烂笔头,虽然《Hadoop in Action》书上已经讲得够详细了,但是对于本菜鸟来说,还是想再写一遍。这是这本书的第二部分第一个程序,要实现将专利数据进行倒排,程序代码如下:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public  class MyJob extends Configured implements Tool{
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text>{
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException{
output.collect(value, key);
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
public  void  reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException{
String csv = "";
while(values.hasNext()){
if(csv.length() > 0) csv += ",";
csv += values.next().toString();
}
output.collect(key, new Text(csv));
}
}

public int  run(String[] args) throws IOException{
Configuration conf = getConf();

JobConf job = new JobConf(conf, MyJob.class);

Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);

job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");

JobClient.runJob(job);

return 0;
}

public static void  main(String[] args) throws Exception{
int  res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}


值得注意的是Hadoop要求Mapper和Reducer必须是他们自身的静态类。

抽去Mapper和Reducer类之后,MyJob的主体框架如下:

public  class MyJob extends Configured implements Tool{
public int  run(String[] args) throws IOException{
Configuration conf = getConf();

JobConf job = new JobConf(conf, MyJob.class);

Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job,in);
FileOutputFormat.setOutputPath(job,out);

job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");

JobClient.runJob(job);

return 0;
}

public static void  main(String[] args) throws Exception{
int  res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
上面的run()方法也叫driver,担负实例化、配置并传递一个JobConf对象命名的作业给JobClient.runJob()已启动MapReduce作业。JobConf对象将保持作业运行需要的全部参数,run()方法也就是担任定制包括输入输出路径、Mapper、Reducer类的角色,当然还有重置默认配置,可用其set()方法配置任意参数。最后JobConf被传到JobClient.runJob(),这就实现了作业的整体规划。

就写这些吧。

运行结果:

。。。。

14/08/20 16:01:00 INFO mapred.Merger: Down to the last merge-pass, with 10 segments left of total size: 70597223 bytes

14/08/20 16:01:01 INFO mapred.LocalJobRunner: hdfs://localhost:9000/user/hadoop/input/cite75_99.txt:201326592+62748839

14/08/20 16:01:04 INFO mapred.LocalJobRunner: hdfs://localhost:9000/user/hadoop/input/cite75_99.txt:201326592+62748839

14/08/20 16:01:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/user/hadoop/input/cite75_99.txt:201326592+62748839

14/08/20 16:01:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting

14/08/20 16:01:08 INFO mapred.LocalJobRunner: hdfs://localhost:9000/user/hadoop/input/cite75_99.txt:201326592+62748839

14/08/20 16:01:08 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000003_0' done.

14/08/20 16:01:08 INFO mapred.LocalJobRunner:

14/08/20 16:01:08 INFO mapred.Merger: Merging 4 sorted segments

14/08/20 16:01:08 INFO mapred.Merger: Down to the last merge-pass, with 4 segments left of total size: 297120317 bytes

14/08/20 16:01:08 INFO mapred.LocalJobRunner:

14/08/20 16:01:14 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:15 INFO mapred.JobClient: map 100% reduce 72%

14/08/20 16:01:17 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:18 INFO mapred.JobClient: map 100% reduce 75%

14/08/20 16:01:20 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:21 INFO mapred.JobClient: map 100% reduce 79%

14/08/20 16:01:23 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:24 INFO mapred.JobClient: map 100% reduce 83%

14/08/20 16:01:26 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:27 INFO mapred.JobClient: map 100% reduce 87%

14/08/20 16:01:29 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:30 INFO mapred.JobClient: map 100% reduce 92%

14/08/20 16:01:32 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:33 INFO mapred.JobClient: map 100% reduce 96%

14/08/20 16:01:35 INFO mapred.LocalJobRunner: reduce > reduce

14/08/20 16:01:36 INFO mapred.JobClient: map 100% reduce 99%

。。。。

生成文件如下:

"CITED" "CITING"

1 3964859,4647229

10000 4539112

100000 5031388

1000006 4714284

1000007 4766693

1000011 5033339

1000017 3908629

1000026 4043055

1000033 4190903,4975983

1000043 4091523

1000044 4082383,4055371

1000045 4290571

1000046 5525001,5918892

1000049 5996916

1000051 4541310

1000054 4946631

1000065 4748968

1000067 4944640,5071294,5312208

1000070 5009029,4928425

1000073 4107819,5474494

1000076 4867716,5845593

1000083 5322091,5566726
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: