Hadoop之MapReduce-自定义排序编程
2015-05-26 21:03
357 查看
一、问题描述
根据给出的数据计算每一个账户总的收入,总的支出以及总利润,并按照总利润由高到低排序,如果总利润相同,则按照总的支出由低到高排序。
二、数据格式
2.1输入数据格式
zhangsan@163.com 6000 0 2014-02-20
zhangsan@163.com 0 1000 2014-02-20
lisi@163.com 2000 1000 2014-02-20
lisi@163.com 10000 9000 2014-02-20
beibei@126.com 100 0 2014-02-20
wangwu@163.com 6000 2000 2014-02-20
2.2输出数据格式
zhangsan@163.com 6000.0 1000.0 5000.0
wangwu@163.com 6000.0 2000.0 4000.0
lisi@163.com 12000.0 10000.0 2000.0
beibei@126.com 100.0 0.0 100.0
三、问题实现
第一步:将每个账户的总的收入,总的支出以及总利润计算输出到HDFS。【默认按照数据字典排序】
第二步:将输出的结果自定义排序。
类InforBean
类SumStep:
类SortStep:
四、输出结果
第一步:将每个账户的总的收入,总的支出以及总利润计算输出到HDFS。
1.代码运行
hadoop jar /root/sort.jar edu.jianwei.hadoop.mr.sort.SumStep /sort /sort/sum
2.输出结果
beibei@126.com 100.0 0.0 100.0
lisi@163.com 12000.0 10000.0 2000.0
wangwu@163.com 6000.0 2000.0 4000.0
zhangsan@163.com 6000.0 1000.0 5000.0
第二步:将输出的结果自定义排序。
1.代码运行
hadoop jar /root/sort.jar edu.jianwei.hadoop.mr.sort.SortStep /sort/sum /sort/sortRes、
2.输出结果
zhangsan@163.com 6000.0 1000.0 5000.0
wangwu@163.com 6000.0 2000.0 4000.0
lisi@163.com 12000.0 10000.0 2000.0
beibei@126.com 100.0 0.0 100.0
根据给出的数据计算每一个账户总的收入,总的支出以及总利润,并按照总利润由高到低排序,如果总利润相同,则按照总的支出由低到高排序。
二、数据格式
2.1输入数据格式
zhangsan@163.com 6000 0 2014-02-20
zhangsan@163.com 0 1000 2014-02-20
lisi@163.com 2000 1000 2014-02-20
lisi@163.com 10000 9000 2014-02-20
beibei@126.com 100 0 2014-02-20
wangwu@163.com 6000 2000 2014-02-20
2.2输出数据格式
zhangsan@163.com 6000.0 1000.0 5000.0
wangwu@163.com 6000.0 2000.0 4000.0
lisi@163.com 12000.0 10000.0 2000.0
beibei@126.com 100.0 0.0 100.0
三、问题实现
第一步:将每个账户的总的收入,总的支出以及总利润计算输出到HDFS。【默认按照数据字典排序】
第二步:将输出的结果自定义排序。
类InforBean
package edu.jianwei.hadoop.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class InfoBean implements WritableComparable<InfoBean> { private String account; private double income; private double expenses; private double profit; public void set(String account, double income, double expenses) { this.account = account; this.income = income; this.expenses = expenses; this.profit = income - expenses; } @Override public String toString() { return this.income + "\t" + this.expenses + "\t" + this.profit; } /** * serialize */ public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(income); out.writeDouble(expenses); out.writeDouble(profit); } /** * deserialize */ public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.income = in.readDouble(); this.expenses = in.readDouble(); this.profit = in.readDouble(); } public int compareTo(InfoBean o) { if (this.profit == o.getProfit()) { return this.expenses > o.getExpenses() ? 1 : -1; } else { return this.profit > o.getProfit() ? -1 : 1; } } public String getAccount() { return account; } public void setAccount(String account) { this.account = account; } public double getIncome() { return income; } public void setIncome(double income) { this.income = income; } public double getExpenses() { return expenses; } public void setExpenses(double expenses) { this.expenses = expenses; } public double getProfit() { return profit; } public void setProfit(double profit) { this.profit = profit; } }
类SumStep:
package edu.jianwei.hadoop.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SumStep { static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean> { public Text k = new Text(); public InfoBean v = new InfoBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] str = line.split("\t"); String account = str[0]; double income = Double.parseDouble(str[1]); double expenses = Double.parseDouble(str[2]); k.set(account); v.set(account, income, expenses); context.write(k, v); } } static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean> { public InfoBean v = new InfoBean(); @Override protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException { double total_inclome = 0; double total_expenses = 0; for (InfoBean v : values) { total_inclome += v.getIncome(); total_expenses += v.getExpenses(); } v.set(null, total_inclome, total_expenses); context.write(key, v); } } public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SumStep.class); job.setMapperClass(SumMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
类SortStep:
package edu.jianwei.hadoop.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SortStep { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortStep.class); job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(InfoBean.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable> { public InfoBean k = new InfoBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strs = line.split("\t"); String account = strs[0]; double income = Double.parseDouble(strs[1]); double expenses = Double.parseDouble(strs[2]); k.set(account, income, expenses); context.write(k, NullWritable.get()); } } public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean> { public Text k = new Text(); @Override protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException { String account = bean.getAccount(); k.set(account); context.write(k, bean); } } }
四、输出结果
第一步:将每个账户的总的收入,总的支出以及总利润计算输出到HDFS。
1.代码运行
hadoop jar /root/sort.jar edu.jianwei.hadoop.mr.sort.SumStep /sort /sort/sum
2.输出结果
beibei@126.com 100.0 0.0 100.0
lisi@163.com 12000.0 10000.0 2000.0
wangwu@163.com 6000.0 2000.0 4000.0
zhangsan@163.com 6000.0 1000.0 5000.0
第二步:将输出的结果自定义排序。
1.代码运行
hadoop jar /root/sort.jar edu.jianwei.hadoop.mr.sort.SortStep /sort/sum /sort/sortRes、
2.输出结果
zhangsan@163.com 6000.0 1000.0 5000.0
wangwu@163.com 6000.0 2000.0 4000.0
lisi@163.com 12000.0 10000.0 2000.0
beibei@126.com 100.0 0.0 100.0
相关文章推荐
- Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)
- hadoop编程(8)-MapReduce案例:次排序(Secondary Sort)详解
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- Hadoop MapReduce编程 API入门系列之二次排序(十六)
- hadoop编程(7)-MapReduce案例:使用TotalOrderPartitioner完成全局排序
- hadoop之求和和自定义排序编程
- Hadoop mapreduce自定义排序WritableComparable
- Hadoop MapReduce编程 API入门系列之网页排序(二十八)
- 从自定义排序深入理解单机hadoop执行mapreduce过程
- Hadoop之MapReduce自定义二次排序流程实例详解
- Hadoop之MapReduce自定义二次排序流程实例详解
- hadoop之MapReduce自定义二次排序流程实例详解 推荐
- Hadoop之MapReduce自定义二次排序流程实例详解
- hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序
- 关于hadoop的mapreduce编程中自定义key,value建立的类实现writable接口
- MapReduce编程实例之自定义排序
- Hadoop系列-MapReduce自定义排序(十三)
- hadoop mapreduce自定义排序
- mapreduce编程自定义排序
- Hadoop mapreduce自定义排序WritableComparable