您的位置:首页 > 运维架构

Hadoop学习之MapReduce(二)

2014-03-17 15:27 267 查看
在通过WordCount的例子直观地了解了MapReduce框架的作业如何编写后,现在对MapReduce框架中的关键接口或者类进行深入地地探索和学习。主要讲解Hadoop1.x中的接口和类,也就是org.apache.hadoop.mapreduce包中的接口和类,上面介绍的WordCount作业也是实现了这个包中的接口和类。首先会介绍Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>和Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>类,应用程序一般通过集成这两个类来实现map和reduce方法,而不同于之前版本中实现Mapper和Reducer接口。然后会介绍其它的接口或者类,包括InputFormat<K,V>、Job、Configuration 、OutputFormat<K,V>、Partitioner<KEY,VALUE>、Context、ToolRunner、Tool、Configured 等。最后通过介绍MapReduce的一些有用的特性,比如DistributedCache,结束MapReduce的学习。

类Mapper将输入的键值对映射为中间键值对的集合。Maps是独立的任务,将输入的记录转化为中间记录,这些转化而来的中间记录不需要和输入记录保持一致的类型。一个给定的输入键值对可能映射为零或者多个输出键值对。MapReduce框架为每个由作业的InputFormat产生的InputSplit生成一个Map任务。Mapper类的实现可以通过使用JobContext.getConfiguration()访问作业的Configuration对象。MapReduce框架首先调用setup(org.apache.hadoop.mapreduce.Mapper.Context),接着为InputSplit中的每个键值对调用map(Object,Object,
Context),最后调用cleanup(Context)。

所有Mapper输出的中间值按照与之关联的键(key)被MapReduce框架分组,然后传递给Reducer,由其决定最终的输出。用户可以通过指定 RawComparator接口的实现类来控制排序和分组。Mapper的输出被分割到每个Reducer,用户可以通过实现Partitioner来定制自己的分割器,这样就可以控制哪些key进入哪些Reducer。用户可以可选地通过Job.setCombinerClass(Class)指定一个组合器来执行中间输出的聚合,这有助于降低从Mapper传输到Reducer的数据量。应用程序可以通过Configuration指定是否对中间输出进行压缩,如何压缩以及使用哪个CompressionCode进行压缩。如果作业没有Reducer,那么Mapper的输出将会直接写到OutputFormat,不会根据key分类。应用程序可以覆盖run(Context)方法在map处理过程中使用更多控制策略。下面是一个Mapper的例子:

public class TokenCounterMapper extends Mapper{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context. write (word, one);
}
}
}
那需要多少个Mapper才能使效率更好呢?Mapper的数量通常由输入的大小决定,也就是由输入文件的总的块的数量决定。每个节点适宜并行运行的Mapper数量大概在10-100范围内,即使已经设置了运行300个mapper。任务的建立需要花费一些时间,因此如果mapper执行的时间至少一分钟的话,那将是最好的。也就是说,如果设置了过多的mapper,这些mapper的建立需要花费一些时间,然而每个mapper都会很快结束,那建立mapper花费的时间将会被浪费。因此,假设有10TB的输入数据,每个块的大小是128MB,那么将会有82000个块,也将会存在82000个mapper,除非使用setNumMapTasks(int) 将mapper的数量设置的更高。

Reducer产生中间值的集合,这些中间值共享某个键(key),该键对应更小的由值组成的集合。Reducer的实现可以通过JobContext.getConfiguration()方法访问作业的Configuration 对象。Reducer有3个主要的阶段:

洗牌。Reducer使用HTTP协议通过网络拷贝每个Mapper的输出。
排序。MapReduce框架根据key对reducer的输入进行归并排序(不同的mapper可能输出相同的key)。洗牌和排序阶段同时发生,比如当输出被提取过来时,已经归并完成。为了对由值迭代器返回的值进行二次排序,应用程序可以使用次要键来扩展key并定义一个分组比较器。键将使用整个键(主键和次要键)排序,但将使用分组比较器进行分组,并决定哪些key和值被送到相同的reducer。分组比较器由Job.setGroupingComparatorClass(Class)指定,排序顺序由Job.setSortComparatorClass(Class)控制。
Reduce。在该阶段为每个分类输入的<key, (collection ofvalues)> 调用reduce(Object,Iterable, Context)方法。Reducer的输出通常通过TaskInputOutputContext.write(Object, Object)写入到RecordWriter 。

Reducer的输出不会再进行分类处理。Reducer的例子如下:

public class IntSumReducer extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable values, Context context) throws IOException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Reducer合适的数量大概是0.95或者1.75*节点数量*mapred.tasktracker.reduce.tasks.maximum(由一个TaskTracker同时运行的reduce任务的数量)。当使用0.95时,所有的reducer会立刻加载并在mapper完成后开始传输mapper的输出。当使用1.75时,较快的节点将会完成reducer的第一轮工作并加载reducer的第二波工作,这样对作业的负载均衡会更好。增加reducer的数量会增加MapReduce框架的开销,但会增加负载均衡和降低故障的损失。上面提到的比例因子稍微小于总的数量,为了在MapReduce中预留一些reducer槽用于推测执行的任务和失败的任务。

Mapper和Reducer中的上下文对象分别为Mapper.Context和Reducer.Context,分别继承自MapContext和ReduceContext,用于任务的输入输出。

Partitioner(分割器)用于分隔键空间。分隔器控制mapper输出的中间值的键的分隔,键或者键的子集被用于取得划分,通常使用一个哈希函数。总的划分数量与作业的reduce任务数量一致,因此这就控制了哪个中间键(记录)被发送到第m个reduce任务进行处理。HashPartitioner 是默认的分割器。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息