您的位置:首页 > 运维架构

Combiner类和Partitioner类——hadoop

2016-05-25 11:18 357 查看
一、Combiner类

1、Hadoop框架使用Mapper将数据处理成一个个<key, value>键值对,再对其进行合并和处理,最后使用Reduce处理数据并输出结果

2、上述过程会遇到一些瓶颈,比如: 在做词频统计的时候,大量具有相同主键的键值对数据如果直接传送个Reduce节点会引起较大的网络带宽开销。可以对每个Map节点处理完成的中间键值对做一个合压缩,即把那些主键相同的键值对归并为该一个键名下的一组数值,这样做不仅可以减轻网络压力,同样也可以大幅度提供程序的效率。

3、Hadoop通过在Mapper类结束后,传入Reduce节点之前使用一个Combiner类来解决相同主键键值对的合并处理。Combiner的作用主要是为了合并和减少Mapper的输出从而减少网络带宽和Reduce节点上的负载。如果我们定义一个Combiner类、MapReducer框架会使用他对中间数据进行多次的处理

4、Combiner类在实现上类似于Reduce类。事实上他就是一个与Reducer类一样。继承自Reducer基类的子类。Combiner的作用知识为了解决网络通信性能问题,因此使用不使用Combiner对结果应该是没有任何影响的。为此,需要特别注意的是,程序设计时,为了保证使用了Combiner后完全不影响Reducer的处理和最终结果,Combiner不能改变Mapper类输出的中间键值对的数据类型。

5、如果Reducer只运行简单的分布式的聚集方法,例如最大值、最小值或者计数,由于这些运算与Combiner类要做的事情是完全一样的,因此在这种情况下可以直接使用Reducer类作为Combiner使用,否则将会出现完全错误的结果。在这种情况下,需要定制一个专门的Combiner类来完成合并处理。

二、Partitioner类

1、为了避免Reduce计算过程中不同Reduce节点间存在数据相关性,需要一个Partition过程。其原因是一个Reduce节点所处理的数据可能会来自多个Map节点,因此Map节点输出的中间结果需使用一定的策略进行合适的分区Partitioner处理,保证具有数据相关性的数据发送到同一个Reduce节点,这样既可避免Reduce计算过程中访问其他的Reduce节点,进而解决数据相关性问题

2、Partitioner用来控制Map输出的中间结果键值对的划分,分区总数与作业的Reduce任务的数量是一样的。

3、Hadoop框架自带了一些默认的HashPartitioner类,默认情况下,hadoop对<key , value>键值对中的key取hash值并按Reducer数目取模来确定怎样分配给相应的Reducer。Hadoop使用HashPartitioner类来执行这一操作。但是,有时候HashPartitioner并不能完成我们需要的功能。这时需要由程序员定制一个Partitioner类。

4、定制Partitioner, 先继承Partitioner类,并重载它的getPartition()方法,一个定义的Partitioner只需要实现两个方法: getPartition()方法和configure()方法。getPartition()方法返回一个0到Reduce数目之间的整形值来确定将<key, value>键值对送到哪一个Reducer中,而它的输入参数除了key和value之外还有一个numPartitions表示总共的划分的个数;而configue()方法使用 Hadoop Job
Configuration配置所使用的Partitioner类。定制Partitioner类的示例代码如下:

public class EdgePartitioner implements Partitioner<Edge , Writable>{
public int getPartition(Edge key , Writable value, int numPartitions)
return new long(key.getDepartureNode()).hashCode()%numPartitions;
}
public void configure(JobConf conf){}
}也可以直接继承一个已有的Partitioner类,然后重载其getPartition()方法,示例代码如下:
Class NewPartitioner extends HashPartitioner<K, V>{
//override the method
getPartition(K key, V value, in numReduceTasks){
//<term, docid> =>term
term = key.toString().split(",")[0];
super.getPartition(term ,value,numReduceTasks);
}

并在Job中设置新的Partitioner类
Job.setPartitionerClass(NewPartitioner);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息