MapReduce的自定义排序、分区和分组
2017-07-12 20:33
369 查看
1.自定义排序(WritableComparable)
我们写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输。上一文我用图解分析了mr程序的基本流程。而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。我们自定义bean来封装处理后的信息的话,我们可以自定义排序规则来挑选bean中的某几个属性来作为排序的依据,这样就很灵活了。
import org.apache.hadoop.io.WritableComparable; public class Person implements WritableComparable<Person> { private String name; //姓名 private int age; //年龄 private int charm; //魅力值 // 如果空 4000 构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。 public Person() { } public Person(String name, int age, int charm) { super(); this.name = name; this.age = age; this.charm = charm; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getCharm() { return charm; } public void setCharm(int charm) { this.charm = charm; } @Override //hadoop的反序列化 public void readFields(DataInput in) throws IOException { name=in.readUTF(); age=in.readInt(); charm=in.readInt(); } @Override //hadoop的序列化 public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); out.writeInt(charm); } @Override //先按照年龄排序,在按照魅力值排序(年龄小,魅力大的在前) public int compareTo(Person o) { if(o.age==this.age){ if(o.charm==this.charm){ return 0; }else{ return o.charm-this.charm; } }else{ return this.age-o.age; } } }
上要实现自定义排序,需要实现WritableComparable这个接口,然后实现三个方法readFields(反序列化)、write(序列化)、和最关键的compareTo(排序)。在mr过程中发生排序的地方就会按照我自定义的排序规则来排序。前提,map的输出的key为封装的Person。
注意1:java的序列化过于重量级(Serializable),所以hadoop开发了一套自己的序列化和反序列化策略(Writable,精简高效),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化。
注意2:如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。
2、自定义分区(Partitioner)
Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。(1)如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。
(2)在job对象中,设job.setPartitionerClass(自定义分区类.class)
import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * @author zzw * map端的输出类型为(Text,Text),这里自定的分区策略为key的首位如果为1,则进入0号分区;如果为2,则进入1号分区;如果是3则进入2号分区 * 假设数据为: * 1367788000 hahaah * 2342344234 xiaomei * 3324234234 zzzz * 6666668888 wwww * 7777777777 ssss */ public class CustomPartitioner extends Partitioner<Text,Text>{ static HashMap<String, Integer> numMap = new HashMap<String, Integer>(); static { numMap.put("1", 0); numMap.put("2", 1); numMap.put("3", 2); } /* * 1)numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。 * 2)如果我们job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。 */ @Override public int getPartition(Text key, Text value, int numPartitions) { Integer hash = numMap.get(key.toString().substring(0, 1)); //将没有匹配到的数据放入3号分区 return hash==null?3:hash; } }
3、自定义分组(GroupingComparator)
假设我们将上面自定义的Person(bean)作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。自定义分组要继承WritableComparator,然后重写compare方法。
定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CustomGroupingComparator extends WritableComparator{ protected CustomGroupingComparator() { super(Person.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { Person abean = (Person) a; Person bbean = (Person) b; //将item_id相同的bean都视为相同,从而聚合为一组 return abean.getAge()-bbean.getAge(); } }
相关文章推荐
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- mapreduce,自定义分区,分组,排序实现join
- mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- Hadoop Mapreduce分区、分组、二次排序过程详解
- MapReduce处理二次排序(分区-排序-分组)
- Hadoop Mapreduce分区、分组、二次排序过程详解
- HADOOP(2)__Mapreduce分区、排序、分组
- mapreduce之分区,分组,排序,二次排序的综合应用
- Hadoop Mapreduce分区、分组、二次排序过程详解
- 第二个MapReduce程序----flowcount(流量统计,自定义排序,自定义分区)
- Hadoop中自定义排序,分区,分组
- hadoop自定义排序、分组、分区(温度统计)
- MapReduce的分区与 分组二次排序
- Mapreduce中的 自定义类型、分组与二次排序
- MapReduce二次排序分区,分组优化
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 自定义分区、数据类型、排序、分组
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]