您的位置:首页 > 运维架构

hadoop mr sort排序 demo

2016-05-20 16:30 357 查看
直接上代码吧

代码主要有两个类Example5 和ConfigVo

package com.hit.mr.example;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ConfigVo implements WritableComparable<ConfigVo> {

private Integer id;

private String serviceName;

private String serviceVersion;

private String key;

private String value;

public ConfigVo() {

}

public ConfigVo(Integer id,String serviceName,String serviceVersion,String key,String value) {

this.id = id;

this.serviceName = serviceName;

this.serviceVersion = serviceVersion;

this.key = key;

this.value = value;

}

public Integer getId() {

return id;

}

public void setId(Integer id) {

this.id = id;

}

public String getServiceName() {

return serviceName;

}

public void setServiceName(String serviceName) {

this.serviceName = serviceName;

}

public String getServiceVersion() {

return serviceVersion;

}

public void setServiceVersion(String serviceVersion) {

this.serviceVersion = serviceVersion;

}

public String getKey() {

return key;

}

public void setKey(String key) {

this.key = key;

}

public String getValue() {

return value;

}

public void setValue(String value) {

this.value = value;

}

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(id);

out.writeUTF(serviceName);

out.writeUTF(serviceVersion);

out.writeUTF(key);

out.writeUTF(value);

}

@Override

public void readFields(DataInput in) throws IOException {

this.id = in.readInt();

this.serviceName = in.readUTF();

this.serviceVersion = in.readUTF();

this.key = in.readUTF();

this.value = in.readUTF();

}

@Override

public String toString() {

return "id = "+id+" serviceName ="+serviceName+" serviceVersion ="+serviceVersion+" key="+key+" value="+value;

}

@Override

public int compareTo(ConfigVo o) {

return id > o.getId()?-1:1;

}

}

exampl5代码如下

package com.hit.mr.example;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* 排序 sort

* @author zh

*

*/

public class Example5 {

public static class SortMap extends Mapper<Object, Text, ConfigVo, NullWritable>{

@Override

protected void map(Object key, Text value,

Mapper<Object, Text, ConfigVo, NullWritable>.Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] fileds = line.split(",");

ConfigVo vo = new ConfigVo(Integer.parseInt(fileds[0]), fileds[1], fileds[2], fileds[4], fileds[6]);

context.write(vo, NullWritable.get());

}

}

public static class SortReduce extends Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>{

@Override

protected void reduce(

ConfigVo key,

Iterable<NullWritable> value,

Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>.Context context)

throws IOException, InterruptedException {

context.write(key, NullWritable.get());

}

}

public static void main(String[] args)throws Exception {

Job job = new Job();

job.setJobName("example5");

job.setMapOutputKeyClass(ConfigVo.class);

job.setMapOutputValueClass(NullWritable.class);

job.setMapperClass(SortMap.class);

job.setReducerClass(SortReduce.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setJarByClass(Example5.class);

job.waitForCompletion(true);

}

}

稍微解释一下 configvo实现了 WritableComparable这个接口会要求 实现接口中的 write readFields compareTo方法,其中write readFields 要求写入属性的顺序必须要与读取的顺序一致,compareTo就是用来排序的.最后说明一下,configvo必须要有无参构造子,不然会执行mr失败,具体的请参考http://blog.csdn.net/infovisthinker/article/details/45152919

能实现排序的主要原理是使用 mr中map之后的数据到 reduce端shuffle过程中的排序,排序会调用compareTo方法 这样实现排序
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: