MapReduce之二——收入支出数据处理与自定义排序
2015-06-07 12:27
429 查看
1.原始数据:trade_info.txt
2.数据bean:InfoBean
2.第一阶段mapreduce处理:SumStep(统计节余)
第一阶段统计结果:
![](http://img.blog.csdn.net/20150607122315414)
3.第一阶段mapreduce处理:SortStep(按总节余排序)
第二阶段统计结果:
帐号 收入 支出 时间 zhangsan@163.com 6000 0 2014-02-20 lisi@163.com 2000 0 2014-02-20 lisi@163.com 0 100 2014-02-20 zhangsan@163.com 3000 0 2014-02-20 wangwu@126.com 9000 0 2014-02-20 wangwu@126.com 0 200 2014-02-20
2.数据bean:InfoBean
public class InfoBean implements WritableComparable<InfoBean>{ private String account; private double in; private double out; private double surplus; public void set(String account, double in, double out) { this.account = account; this.in = in; this.out = out; this.surplus = in - out; } public double getSurplus() { return surplus; } public void setSurplus(double surplus) { this.surplus = surplus; } public String getAccount() { return account; } public void setAccount(String account) { this.account = account; } public double getIn() { return in; } public void setIn(double in) { this.in = in; } public double getOut() { return out; } public void setOut(double out) { this.out = out; } @Override public String toString() { return this.in+"\t"+this.out+"\t"+this.surplus; } public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(in); out.writeDouble(this.out); out.writeDouble(surplus); } public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.in = in.readDouble(); this.out = in.readDouble(); this.surplus = in.readDouble(); } //降序 public int compareTo(InfoBean arg0) { return this.surplus >= arg0.surplus ? -1 : 1; } }
2.第一阶段mapreduce处理:SumStep(统计节余)
public class SumStep { public static void main(String[] args) throws Exception { Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); job.setJobName("Trade-Info"); job.setJarByClass(SumStep.class); job.setMapperClass(SumMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{ private Text k=new Text(); private InfoBean v = new InfoBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, InfoBean>.Context context) throws IOException, InterruptedException { String line=value.toString(); String[] filds=line.split("\t"); String account=filds[0]; double in=Double.parseDouble(filds[1]); double out=Double.parseDouble(filds[2]); k.set(account); v.set(account, in, out); context.write(k, v); } } public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{ private InfoBean v=new InfoBean(); @Override protected void reduce(Text key, Iterable<InfoBean> values, Reducer<Text, InfoBean, Text, InfoBean>.Context context) throws IOException, InterruptedException { double in_sum=0; double out_sum=0; for (InfoBean infoBean : values) { in_sum+=infoBean.getIn(); out_sum+=infoBean.getOut(); } v.set("", in_sum, out_sum); context.write(key, v); } } }
第一阶段统计结果:
3.第一阶段mapreduce处理:SortStep(按总节余排序)
public class SortStep { public static void main(String[] args) throws Exception { Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); job.setJarByClass(SortStep.class); job.setJobName("SortStepJob"); job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(InfoBean.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } /** * mapreduce 自定义排序:即把要排序的bean类作为key2,且这个bean要实现WritableComparable<Bean>接口 * 自定义排序规则 * @author root * */ public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable> { private InfoBean k= new InfoBean(); @Override protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException { String line=value.toString(); String[]fileds=line.split("\t"); String account=fileds[0]; double in=Double.parseDouble(fileds[1]); double out=Double.parseDouble(fileds[2]); k.set(account, in, out); context.write(k, NullWritable.get()); } } public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{ Text k=new Text(); @Override protected void reduce(InfoBean k1, Iterable<NullWritable> v2s, Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context) throws IOException, InterruptedException { k.set(k1.getAccount()); context.write(k, k1); } } }
第二阶段统计结果:
相关文章推荐
- Linux系统结构 详解
- Spring集成ORM框架之JDBC的使用
- Spring中$Proxy4 cannot be cast to错误
- Delphi获取身份证号码验证码算法
- 《鸟哥的Linux私房菜》第零章 计算机概论
- Tomcat目录及配置文件解析
- iOS开发之xib技巧介绍
- lua
- haproxy
- 网页版微信和微信公共号扫码登陆原理分析
- 第十三周项目--链表类
- JavaScript2种构造函数创建对象的模式以及继承的实现
- 海尔的另类转型:内部只有平台主小微主创客
- 黑马程序员 面向对象(一)
- BZOJ 1664: [Usaco2006 Open]County Fair Events 参加节日庆祝( dp )
- Servlet的配置参数load-on-startup参数理解
- Unity3D游戏开发之自由视角下的角色控制
- 【hdoj 1312】Red and Black
- 事件监听
- 以朋友身份,说说猝然离世的桑德伯格丈夫是个什么样的人