您的位置:首页 > 运维架构

学习Hadoop MapReduce与WordCount例子分析

2015-02-15 11:02 716 查看
/*
MapReduce框架一直围绕着key-value这样的数据结构,下面以官方自带的WordCount为例子,自己分析MapReduce的工作机制。MapReduce可以分为Map和Reduce过程,
代码实现了两个类,分别是继承Mapper和Reduceer,Mapper类里面有map接口,Reduceer类有reduce接口,对于统计单词这个例子来说,MapReduce会把文件以行为
拆分对象,每分析一行就会调用Mapper类里面的map接口,然后map接口里面的代码由程序员实现其逻辑,然后把map接口处理完的结果输送给Reduceer的reduce的接
口,中间还可以插入一个combiner的接口用于对map接口的数据进行中间结果处理再丢给reduce做最终的汇总。具体流程看代码注释。
*/

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/*
Mapper他是一个模板类,Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,
KEYIN 输入key的类型,VALUEIN输入value的类型
KEYOUT 输出key的类型,VALUEOUT输出value的类型
四个类型决定了map接口的输入与输出类型

比较形象地描述key,value,在map,combiner,reduce流转的
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

其中还有规定,就是KEY和VALUE类型必须是实现了Writeable接口的,KEY类型还需要额外实现WritableComparable接口

通常在Mapper模板里面,KEYIN是不需要特指定的,就用基类Object就可以了,VAULEIN指定为Text,这个Text是<pre name="code" class="java">    org.apache.hadoop.io.Text,这个Text已经满足了实现Writeable接口的条件了,在这个例子里面VALUE就是文件的行内
容,所以定义类型为Text。
对于KEYOUT和VALUEOUT,作为输出key类型和value类型,这里定义为Text和IntWritable,keyout就是需要统计单词个数
的单词,IntWriteable就是对应某个单词的次数,其实这个就是一个Int类型,为了符合接口需要所以就基础了Writeable
Context它是一个贯通map接口<-->combiner接口<-->reduce接口的上下文数据,在map接口里面,单词对应次数会保存在context
里面,到了reduce接口,MapReduce会把之前map的context用key对应结果集合的形式给reduce接口。
*/

private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

/*
下面是对两个文件统计单词调用map接口之后的context结果
For the given sample input the first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
*/

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /*
Reduceer也是一个类模板,跟Mapper一样需要指定KEYIN,VALUEIN,KEYOUT,VALUEOUT,
其中KEYIN和VALUEIN必须跟Mapper的KEYOUT,VALUEOUT一一对应,因为map接口输出的结果key->value
就是reduce接口的输入,只是MapReduce框架把map接口里面相同的key变成一个key->values
的values集合,所以在reduce接口里面KEYIN是Text也就是单词,VALUEOUT是IntWriteable集合的
迭代器Interable<IntWriteable>,context就是reduce的输出结果了

*/

private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

/*
在例子里面,还指定了combiner,其实cominer和reduce都是同一个接口reduce,第一次调用reduce接口是combiner过程,把每个文件
的单词做了key->value 到 key->values的汇总,结果如下
The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>

The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
*/

/*
第二次调用reduce接口,就是reduce的过程,把combiner处理过的中间结果做一次最终的汇总
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
*/

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: