hadoop mr sort排序 demo
2016-05-20 16:30
357 查看
直接上代码吧
代码主要有两个类Example5 和ConfigVo
package com.hit.mr.example;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class ConfigVo implements WritableComparable<ConfigVo> {
private Integer id;
private String serviceName;
private String serviceVersion;
private String key;
private String value;
public ConfigVo() {
}
public ConfigVo(Integer id,String serviceName,String serviceVersion,String key,String value) {
this.id = id;
this.serviceName = serviceName;
this.serviceVersion = serviceVersion;
this.key = key;
this.value = value;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(serviceName);
out.writeUTF(serviceVersion);
out.writeUTF(key);
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.serviceName = in.readUTF();
this.serviceVersion = in.readUTF();
this.key = in.readUTF();
this.value = in.readUTF();
}
@Override
public String toString() {
return "id = "+id+" serviceName ="+serviceName+" serviceVersion ="+serviceVersion+" key="+key+" value="+value;
}
@Override
public int compareTo(ConfigVo o) {
return id > o.getId()?-1:1;
}
}
exampl5代码如下
package com.hit.mr.example;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 排序 sort
* @author zh
*
*/
public class Example5 {
public static class SortMap extends Mapper<Object, Text, ConfigVo, NullWritable>{
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, ConfigVo, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
ConfigVo vo = new ConfigVo(Integer.parseInt(fileds[0]), fileds[1], fileds[2], fileds[4], fileds[6]);
context.write(vo, NullWritable.get());
}
}
public static class SortReduce extends Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>{
@Override
protected void reduce(
ConfigVo key,
Iterable<NullWritable> value,
Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args)throws Exception {
Job job = new Job();
job.setJobName("example5");
job.setMapOutputKeyClass(ConfigVo.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapperClass(SortMap.class);
job.setReducerClass(SortReduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(Example5.class);
job.waitForCompletion(true);
}
}
稍微解释一下 configvo实现了 WritableComparable这个接口会要求 实现接口中的 write readFields compareTo方法,其中write readFields 要求写入属性的顺序必须要与读取的顺序一致,compareTo就是用来排序的.最后说明一下,configvo必须要有无参构造子,不然会执行mr失败,具体的请参考http://blog.csdn.net/infovisthinker/article/details/45152919
能实现排序的主要原理是使用 mr中map之后的数据到 reduce端shuffle过程中的排序,排序会调用compareTo方法 这样实现排序
代码主要有两个类Example5 和ConfigVo
package com.hit.mr.example;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class ConfigVo implements WritableComparable<ConfigVo> {
private Integer id;
private String serviceName;
private String serviceVersion;
private String key;
private String value;
public ConfigVo() {
}
public ConfigVo(Integer id,String serviceName,String serviceVersion,String key,String value) {
this.id = id;
this.serviceName = serviceName;
this.serviceVersion = serviceVersion;
this.key = key;
this.value = value;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(String serviceVersion) {
this.serviceVersion = serviceVersion;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(serviceName);
out.writeUTF(serviceVersion);
out.writeUTF(key);
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.serviceName = in.readUTF();
this.serviceVersion = in.readUTF();
this.key = in.readUTF();
this.value = in.readUTF();
}
@Override
public String toString() {
return "id = "+id+" serviceName ="+serviceName+" serviceVersion ="+serviceVersion+" key="+key+" value="+value;
}
@Override
public int compareTo(ConfigVo o) {
return id > o.getId()?-1:1;
}
}
exampl5代码如下
package com.hit.mr.example;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 排序 sort
* @author zh
*
*/
public class Example5 {
public static class SortMap extends Mapper<Object, Text, ConfigVo, NullWritable>{
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, ConfigVo, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
ConfigVo vo = new ConfigVo(Integer.parseInt(fileds[0]), fileds[1], fileds[2], fileds[4], fileds[6]);
context.write(vo, NullWritable.get());
}
}
public static class SortReduce extends Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>{
@Override
protected void reduce(
ConfigVo key,
Iterable<NullWritable> value,
Reducer<ConfigVo, NullWritable, ConfigVo, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args)throws Exception {
Job job = new Job();
job.setJobName("example5");
job.setMapOutputKeyClass(ConfigVo.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapperClass(SortMap.class);
job.setReducerClass(SortReduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJarByClass(Example5.class);
job.waitForCompletion(true);
}
}
稍微解释一下 configvo实现了 WritableComparable这个接口会要求 实现接口中的 write readFields compareTo方法,其中write readFields 要求写入属性的顺序必须要与读取的顺序一致,compareTo就是用来排序的.最后说明一下,configvo必须要有无参构造子,不然会执行mr失败,具体的请参考http://blog.csdn.net/infovisthinker/article/details/45152919
能实现排序的主要原理是使用 mr中map之后的数据到 reduce端shuffle过程中的排序,排序会调用compareTo方法 这样实现排序
相关文章推荐
- Linux下的CentOS、Ubuntu、Gentoo 比较
- tomcat改utf-8
- Linux灰常重要命令—find命令
- linux误操作删除掉var(rm /var/*)目录导致的问题,及解决方法
- Linux下的文件有三个“时间”
- Mac OS安装nginx with nginx-stick-module-ng
- linux下LAMP环境的搭建
- OpenCV2简单的特征匹配
- tomcat 重新编绎
- destoon网站根目录license.txt不允许修改或删除,请检查
- weblogic在linux下启动会卡住的问题解决
- Docker 容器概念
- Xcode7.2配置OpenGL环境(包括GLTools)
- Centos 远程重装系统脚本
- VOOC还真算是OPPO的核心技术
- linux下jdk,tomcat的安装和配置
- Openv Switch 完全使用手册
- Linux学习---静态库 动态库
- cronolog切割tomcat日志以及日志导出方法
- tomcat多站点配置-window版