您的位置:首页 > 其它

MapReduce处理二次排序(分区-排序-分组)

2015-03-28 12:37 477 查看
MapReduce二次排序原理

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现。

本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。

这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个<LongWritable,Text>对输入的给Map的map方法。

注意输出应该符合自定义Map中定义的输出<IntPair,IntWritable>.最终是生成一个List<IntPair,IntWritable>,在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置Key比较函数类,则使用key的实现的compareTo方法。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,然后开始构造一个key对应的value迭代器,这是就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,他们的value就放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和他的value迭代器)。同样注意输入与输出的类型必须与自定义的reducer中声明的一致。

核心总结

1.map最后阶段进行partition分区。一般使用job.setPartitionerClass设置的类,如果没有自定义的类,用key的hashcode()方法进行排序

2.每个分区内部调用job.setSortComparatorClass设置Key的比较函数类进行排序,如果没有则使用key的实现的compareTo方法。

3.当reduce接收到所有map传输过来的数据之后,调job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用key的实现的compareTo方法

4.紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个key的value放在一个迭代器里面

分区 ---> 排序(二次) ---> 分组

分区默认的是key的hashcode()

排序默认的实key的compareTo()

-----------------------------------------

job.setPartitionerClass(Partitioner p); //设置分区。默认分区时hashcode()

job.setSortComparatorClass(RawComparator c); //比较排序。shuffle阶段map输出之后,reduce之前。默认是key的compareTo()方法

job.setGroupingComparatorClass(RawComparator c); //分组。Reduce阶段

-----------------------------------------

案例

原始数据

2 12:12:34 2_hao123

3 09:10:34 3_baidu

1 15:02:41 1_google

3 22:11:34 3_sougou

1 19:23:23 1_baidu

2 13:56:60 2_soso

分别依据第一列和第二列对数据进行二次排序

1.分区类

package test.mr.seconderysort;

import org.apache.hadoop.io.Text;

/*
* 分区类
*/
public class Partitioner extends
org.apache.hadoop.mapreduce.Partitioner<StringPart, Text> {

@Override
public int getPartition(StringPart key, Text value, int numPartitions) {
// TODO Auto-generated method stub
return Math.abs(key.hashCode()) % numPartitions;
}

}


2.自定义Map输出的key类,将原始数据要排序的两列作为该JavaBean的属性,实现WritableComparable接口,实现CompareTo()排序方法

Ps:WritableComparatable接口中的CompareTo()方法:在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等。

String类中的CompareTo()方法:

/*

* compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方全比较完,这时就比较字符的长度.

*

* 例: String s1 = "abc";

* String s2 = "abcd";

* String s3 = "abcdfg";

* String s4 = "1bcdfg";

* String s5 = "cdfg";

* System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)

* System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)

* System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)

* System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)

*/

package test.mr.seconderysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
* 自定义key
*/
/*
*如果想对自己写的类排序,你就把自己写的这个类实现Comparable接口
*然后写一个comparaTo方法来规定这个类的对象排序的顺序。
*在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等
*/
public class StringPart implements WritableComparable<StringPart> {
/*
* 两列排序
*/
private String first;
private String second;

public String getFirst() {
return first;
}

public void setFirst(String first) {
this.first = first;
}

public String getSecond() {
return second;
}

public void setSecond(String second) {
this.second = second;
}

public StringPart() {
super();
// TODO Auto-generated constructor stub
}

public StringPart(String first, String second) {
super();
this.first = first;
this.second = second;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeUTF(second);

}

@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readUTF();
}

/*
* 排序
*/
/*
* compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的
*
* 差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方
*
* 全比较完,这时就比较字符的长度.
*
* 例:  String s1 = "abc";
*     String s2 = "abcd";
*     String s3 = "abcdfg";
*     String s4 = "1bcdfg";
*     String s5 = "cdfg";
*     System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)
*     System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)
*     System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)
*     System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)
*/
@Override
public int compareTo(StringPart o) {
if (!this.first.equals(o.getFirst())) {
return first.compareTo(o.getFirst()); // 字符串的compareTo()方法
} else {
if (!this.second.equals(o.getSecond())) {
return second.compareTo(o.getSecond());
} else {
return 0;
}
}
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + ((second == null) ? 0 : second.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StringPart other = (StringPart) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second == null) {
if (other.second != null)
return false;
} else if (!second.equals(other.second))
return false;
return true;
}

}


3.分组类(根据原始数据的第一列进行分组)

package test.mr.seconderysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/*
* 实现分组
*/
public class Grouping extends WritableComparator {

protected Grouping() {
super(StringPart.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPart a1 = (StringPart) a;
StringPart b1 = (StringPart) b;
// 只需要比较a1,b1的first字段即认为他们是否属于同组
return a1.getFirst().compareTo(b1.getFirst());
}

}


4.Map类
package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeconderyMap extends Mapper<LongWritable, Text, StringPart, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, StringPart, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] str = line.split("\t");
if (str.length == 3) {
StringPart temp = new StringPart(str[0], str[1]);
context.write(temp, value);
}
}

}


5.Reduce类

package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeconderyRedu extends
Reducer<StringPart, Text, NullWritable, Text> {

private static Text part = new Text("------------");

@Override
protected void reduce(StringPart key, Iterable<Text> values,
Reducer<StringPart, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(NullWritable.get(), part);
for (Text t : values) {
context.write(NullWritable.get(), t);
}
}
}


6.job类

package test.mr.seconderysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SeconderyMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(SeconderyMain.class);

job.setGroupingComparatorClass(Grouping.class);
job.setPartitionerClass(Partitioner.class);

job.setMapperClass(SeconderyMap.class);
job.setMapOutputKeyClass(StringPart.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(SeconderyRedu.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐