hadoop自定义实现排序流量统计
2016-11-22 14:29
387 查看
1.上篇文章讲到mapreduce实现简单的流量统计,但最后的结果是按手机号的字典顺序进行输出的,如果我们需要实现按总流量的大小进行排序输出,怎么办?
2.我们可以用上篇的结果文件作为mapreduce的输入,从新写一个mapreduce程序。
3.map输出为FlowBean作为key,输出为null,而FlowBean我们实现WritableComparable接口,自定义它的排序规则,那么map输出到reduce框架中的key时候,他就会按照我们自定义的规则排序输出。
4.FlowBean具体代码如下:
5.mapreduce程序代码如下:
6.运行结果如下:
2.我们可以用上篇的结果文件作为mapreduce的输入,从新写一个mapreduce程序。
3.map输出为FlowBean作为key,输出为null,而FlowBean我们实现WritableComparable接口,自定义它的排序规则,那么map输出到reduce框架中的key时候,他就会按照我们自定义的规则排序输出。
4.FlowBean具体代码如下:
package com.zhichao.wan.flowmr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNB; private long u_load; private long d_load; private long s_load; public FlowBean(){ } public FlowBean(String phoneNB, long u_load, long d_load) { super(); this.phoneNB = phoneNB; this.u_load = u_load; this.d_load = d_load; this.s_load=u_load+d_load; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getU_load() { return u_load; } public void setU_load(long u_load) { this.u_load = u_load; } public long getD_load() { return d_load; } public void setD_load(long d_load) { this.d_load = d_load; } public long getS_load() { return s_load; } public void setS_load(long s_load) { this.s_load = s_load; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(d_load); out.writeLong(u_load); out.writeLong(s_load); } @Override public void readFields(DataInput in) throws IOException { phoneNB=in.readUTF(); d_load=in.readLong(); u_load=in.readLong(); s_load=in.readLong(); } @Override public String toString() { return ""+d_load+"\t"+u_load+"\t"+s_load; } @Override public int compareTo(FlowBean arg0) { // TODO Auto-generated method stub return s_load>arg0.getS_load()?-1:1; } }
5.mapreduce程序代码如下:
package com.zhichao.wan.flowmr.sort; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.zhichao.wan.flowmr.FlowBean; public class FlowSortMR { public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fileds = StringUtils.split(line,"\t"); String phoneNB=fileds[0]; long u_load=Long.parseLong(fileds[1]); long d_load=Long.parseLong(fileds[2]); context.write(new FlowBean(phoneNB, u_load, d_load), NullWritable.get()); } } public static class sortReducder extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { String phoneNB=key.getPhoneNB(); context.write(new Text(phoneNB), new FlowBean(phoneNB, key.getU_load(), key.getD_load())); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf); job.setJarByClass(FlowSortMR.class); job.setMapperClass(SortMapper.class); job.setReducerClass(sortReducder.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path("F:/hadoop/flow/output")); FileOutputFormat.setOutputPath(job, new Path("F:/hadoop/flow/output1")); job.waitForCompletion(true); } }
6.运行结果如下:
13726238888 2481 24681 27162 13726230503 2481 24681 27162 13925057413 63 11058 11121 18320173382 18 9531 9549 13502468823 102 7335 7437 13660577991 9 6960 6969 13922314466 3008 3720 6728 13560439658 5892 400 6292 84138413 4116 1432 5548 15013685858 27 3659 3686 15920133257 20 3156 3176 13602846565 12 1938 1950 15989002119 3 1938 1941 13926435656 1512 200 1712 18211575961 12 1527 1539 13560436666 954 200 1154 13480253104 180 200 380 13760778710 120 200 320 13826544101 0 200 200 13926251106 0 200 200 13719199419 0 200 200
相关文章推荐
- 使用目前hadoop内置的序列化类(不使用自定义序列化类),实现流量统计的功能
- 流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
- Hadoop开篇之Mapreduce实现多类别流量统计的两种实现方式
- hadoop自定义排序、分组、分区(温度统计)
- Hadoop自定义Writable实现二次排序
- 第二个MapReduce程序----flowcount(流量统计,自定义排序,自定义分区)
- Hadoop入门(八)自定义类型实例-统计手机流量数据Demo
- ExtJs4实战流量统计系统----权限该怎么实现&自定义ToolBar(五)
- Hadoop——自定义数据类型,实现WritableComparable, 并且 分组,排序
- Hadoop 2.2.0词频统计(实现自定义的Partitioner和Combiner)
- 一起学Hadoop——使用自定义Partition实现hadoop部分排序
- GridView 实现自定义分页、排序、查询、添加、编辑、多选删除 之代码规范
- 在Lucene.net实现自定义排序
- 自定义jtable,实现排序,windows式多选习惯,斑马线,ToolTip
- 实现一个自定义字符串的排序 ,重写Comparable接口的练习!!
- 利用case或者decode实现自定义顺序排序
- GridView 实现自定义分页、排序、查询、添加、编辑、多选删除
- java Lucene 中自定义排序的实现
- GridView 实现自定义分页、排序、查询、添加、编辑、多选删除 之代码规范