您的位置:首页 > 其它

MapReduce的自定义排序、分区和分组

2017-07-12 20:33 369 查看

1.自定义排序(WritableComparable)

我们写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输。上一文我用图解分析了mr程序的基本流程。而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。

我们自定义bean来封装处理后的信息的话,我们可以自定义排序规则来挑选bean中的某几个属性来作为排序的依据,这样就很灵活了。

import org.apache.hadoop.io.WritableComparable;

public class Person implements WritableComparable<Person> {
private String name;   //姓名
private int age;     //年龄
private int charm;   //魅力值
// 如果空
4000
构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。
public Person() {
}
public Person(String name, int age, int charm) {
super();
this.name = name;
this.age = age;
this.charm = charm;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getCharm() {
return charm;
}
public void setCharm(int charm) {
this.charm = charm;
}

@Override    //hadoop的反序列化
public void readFields(DataInput in) throws IOException {
name=in.readUTF();
age=in.readInt();
charm=in.readInt();
}

@Override    //hadoop的序列化
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
out.writeInt(charm);
}

@Override   //先按照年龄排序,在按照魅力值排序(年龄小,魅力大的在前)
public int compareTo(Person o) {
if(o.age==this.age){
if(o.charm==this.charm){
return 0;
}else{
return o.charm-this.charm;
}
}else{
return this.age-o.age;
}
}

}


上要实现自定义排序,需要实现WritableComparable这个接口,然后实现三个方法readFields(反序列化)、write(序列化)、和最关键的compareTo(排序)。在mr过程中发生排序的地方就会按照我自定义的排序规则来排序。前提,map的输出的key为封装的Person。

注意1:java的序列化过于重量级(Serializable),所以hadoop开发了一套自己的序列化和反序列化策略(Writable,精简高效),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化

注意2:如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。

2、自定义分区(Partitioner)

Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。

(1)如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。

(2)在job对象中,设job.setPartitionerClass(自定义分区类.class)

import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author zzw
* map端的输出类型为(Text,Text),这里自定的分区策略为key的首位如果为1,则进入0号分区;如果为2,则进入1号分区;如果是3则进入2号分区
* 假设数据为:
* 1367788000   hahaah
* 2342344234   xiaomei
* 3324234234   zzzz
* 6666668888   wwww
* 7777777777   ssss
*/
public class CustomPartitioner extends Partitioner<Text,Text>{
static HashMap<String, Integer> numMap = new HashMap<String, Integer>();
static {
numMap.put("1", 0);
numMap.put("2", 1);
numMap.put("3", 2);
}
/*
* 1)numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。
* 2)如果我们job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer hash = numMap.get(key.toString().substring(0, 1));
//将没有匹配到的数据放入3号分区
return hash==null?3:hash;
}
}


3、自定义分组(GroupingComparator)

假设我们将上面自定义的Person(bean)作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。

自定义分组要继承WritableComparator,然后重写compare方法。

定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator{
protected CustomGroupingComparator() {
super(Person.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Person abean = (Person) a;
Person bbean = (Person) b;
//将item_id相同的bean都视为相同,从而聚合为一组
return abean.getAge()-bbean.getAge();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: