您的位置:首页 > 运维架构

Hadoop - Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化

2011-09-30 10:32 561 查看
-----------------------

今天,偶然发现Hadoop主页已经更新了文档,已经有了对于r0.21.0版本的最新文档,大家可以参考:http://hadoop.apache.org/mapreduce/docs/r0.21.0/
hadoop的文档还是非常详尽值得细细品味的,本文留在这里,供大家了解新旧版本之间的差别。

-----------------------

Example程序通常是我们学习一个新的API的最好的方式。WordCount是一个非常经典的用来介绍怎么样用Hadoop - Map/Reduce来编写自己的云计算程序的example程序。但是随着Hadoop的不断演进,很多接口API都已经发生了变化,本文意在通过对比新旧WordCount程序的实现来了解在当前最新hadoop版本中推荐的使用Hadoop来创建云计算程序的方式。

首先,先介绍一下WordCount程序是干什么的:WordCount是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。

然后,我们再来看看在当前,网上可以找到的大部分实现方式,包括Hadoop官网doc中的WordCount的实现方式

public class WordCount {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = valuetoString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizerhasMoreTokens()) {
wordset(tokenizernextToken());
outputcollect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (valueshasNext()) {
sum += valuesnext()get();
}
outputcollect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCountclass);
confsetJobName("wordcount");

confsetOutputKeyClass(Textclass);
confsetOutputValueClass(IntWritableclass);

confsetMapperClass(Mapclass);
confsetCombinerClass(Reduceclass);
confsetReducerClass(Reduceclass);

confsetInputFormat(TextInputFormatclass);
confsetOutputFormat(TextOutputFormatclass);

FileInputFormatsetInputPaths(conf, new Path(args[0]));
FileOutputFormatsetOutputPath(conf, new Path(args[1]));

JobClientrunJob(conf);
}
}


这里,我暂时不对程序做任何解释,让我们再来看看最新版中WordCount程序的实现,通过对比来了解其中的变化。

最新版实现如下:

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{                             #1

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context                         #2
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {                         #3
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,                      #4
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();                                       #5
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");                                          #6
job.setJarByClass(WordCount.class);                                             #7
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);                               #8
}
}


从上面用颜色标注的地方我们可以看到,新版和旧版在实现上还是有很大的改动的:

1, Map类不再采用继承MapReduceBase基类并实现Mapper接口的方式,而是直接继承一个新的Mapper<Object, Text, Text, IntWritable>类就可以。

2, 新的Mapper类中也有一个map方法,只不过和旧版中的map方法的参数不同,但是他们的功能是相同的。hadoop框架会对于每个输入的键值对执行一次map方法。

3, Reduce类不再采用继承MapReduceBase基类并实现 Reducer接口的方式,而是直接继承一个新的Reducer<Text,IntWritable,Text,IntWritable>类就可以。

4, 新的Reduce类中也有一个reduce方法,只不过和旧版中的reduce方法的参数不同,但是他们的功能是相同的。hadoop框架会对于框架为成组的输入数据中的每个<key, (list of values)>对调用一次reduce方法。

5, 新版废弃了JobConf类,直接使用JobConf的父类Configuration来进行一些配置的管理。

6, 新版直接使用类Job来描述一个job相关的各种信息。

7, 通过Job类的方法,设置一个Job在运行过程中所需的所有相关参数

8, 直接通过Job类的方法来执行Job。 这里waitForCompletion函数会一直等待job结束才会返回,还有另外一个方法:commit,commit会在提交job之后立刻返回。

通过上面的对比,我们可以看到,在新版的实现当中,可以说发生了非常大的变化。旧版中的很多类,现在已经都被标识为Deprecated

本质上来说,hadoop框架的运行方式并没有发生大的变化,我们在hadoop基础上开发程序的逻辑也并没有发生大的变化,只是接口方面有所不同。在新版中,引入了Context,这使得以后hadoop会更容易扩展。Configuration和Job的使用,也使编程的逻辑更为直观。如果大家去看hadoop的具体实现,可以看到很多新引入的类,都在包org.apache.hadoop.mapreduce下,如果大家有兴趣,可以自己去看看这些新的类。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: