自定义数据类型,实现数据排序
2015-05-16 09:19
246 查看
代码来源吴超7天视频
输入样例:
3 1
3 3
1 1
1 2
2 2
2 1
输出样例:
1 1
1 2
2 1
2 2
3 1
3 3
输入样例:
3 1
3 3
1 1
1 2
2 2
2 1
输出样例:
1 1
1 2
2 1
2 2
3 1
3 3
package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class SortApp { static String INPUT_PATH = "hdfs://hadoop:9000/input"; static String OUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUT_PATH))){ fileSystem.delete(new Path(OUT_PATH),true); } Job job = new Job(configuration, SortApp.class.getSimpleName()); //1.1 指定输入文件路径 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定哪个类用来格式化输入文件 job.setInputFormatClass(TextInputFormat.class); //1.2指定自定义的Mapper类 job.setMapperClass(MyMapper.class); //指定输出<k2,v2>的类型 job.setMapOutputKeyClass(NewK2.class); job.setMapOutputValueClass(LongWritable.class); //1.3 指定分区类 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //1.4 TODO 排序、分区 //1.5 TODO (可选)合并 //2.2 指定自定义的reduce类 job.setReducerClass(MyReducer.class); //指定输出<k3,v3>的类型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //2.3 指定输出到哪里 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //设定输出文件的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //把代码提交给JobTracker执行 job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, NewK2,LongWritable>{ @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2,LongWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] splited = value.toString().split("\t"); NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); context.write(k2, v2); } } static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(NewK2 k2, Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(new LongWritable(k2.first), new LongWritable(k2.second)); } } /** * 问:为什么实现该类? * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2 * */ static class NewK2 implements WritableComparable<NewK2>{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { // TODO Auto-generated method stub //long temp = this.first - o.first; long temp = o.first - this.first; if(temp !=0 ) return (int)temp; else return (int)(this.second - o.second); } @Override public int hashCode() { // TODO Auto-generated method stub return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { // TODO Auto-generated method stub if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } } }
相关文章推荐
- 在SQL和ERWIN中用自定义类型、规则和默认值实现check约束从而保证数据的完整性
- WCF基础教程(四)——数据契约实现传送自定义数据类型
- struts2实现自定义数据类型转换器
- 通过SQL Server自定义数据类型实现导入数据
- 在SQL和ERWIN中用自定义类型、规则和默认值实现check约束从而保证数据的完整性
- Hadoop——自定义数据类型,实现WritableComparable, 并且 分组,排序
- Hibernate的自定义数据类型实现接口——UserType详解
- 在SQL和ERWIN中用自定义类型、规则和默认值实现check约束从而保证数据的完整性
- [Silverlight]实现到自定义类型的属性数据绑定
- 使用自定义数据类型实现评论数时间、评论总数计数(mapreduce)
- 利用抽象工厂实现自定义多数据类型接口
- 用Visio画UML类图(实现自定义数据类型)
- Hibernate的自定义数据类型实现接口——UserType详解
- Hibernate的自定义数据类型实现接口——UserType详解
- Hibernate的自定义数据类型实现接口之一——UserType祥解
- 在Activity中通过Bundle传递自定义数据类型
- 自己实现的数据表格控件(dataTable),支持自定义样式和标题数据、ajax等各种自定义设置以及分页自定义
- Redis 一、数据结构与对象--五大数据类型的底层结构实现
- 面试题---实现一个函数clone,可以对JavaScript中的5种主要的数据类型(包括Number、String、Object、Array、Boolean)进行值复制。
- QVariant与自定义数据类型转换的方法