您的位置:首页 > 运维架构

hadoop自定义实现排序流量统计

2016-11-22 14:29 387 查看
1.上篇文章讲到mapreduce实现简单的流量统计,但最后的结果是按手机号的字典顺序进行输出的,如果我们需要实现按总流量的大小进行排序输出,怎么办?

2.我们可以用上篇的结果文件作为mapreduce的输入,从新写一个mapreduce程序。

3.map输出为FlowBean作为key,输出为null,而FlowBean我们实现WritableComparable接口,自定义它的排序规则,那么map输出到reduce框架中的key时候,他就会按照我们自定义的规则排序输出。

4.FlowBean具体代码如下:

package com.zhichao.wan.flowmr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

private String phoneNB;
private long u_load;
private long d_load;
private long s_load;

public FlowBean(){

}

public FlowBean(String phoneNB, long u_load, long d_load) {
super();
this.phoneNB = phoneNB;
this.u_load = u_load;
this.d_load = d_load;
this.s_load=u_load+d_load;
}

public String getPhoneNB() {
return phoneNB;
}

public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}

public long getU_load() {
return u_load;
}

public void setU_load(long u_load) {
this.u_load = u_load;
}

public long getD_load() {
return d_load;
}

public void setD_load(long d_load) {
this.d_load = d_load;
}

public long getS_load() {
return s_load;
}

public void setS_load(long s_load) {
this.s_load = s_load;
}

@Override
public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);
out.writeLong(d_load);
out.writeLong(u_load);
out.writeLong(s_load);

}

@Override
public void readFields(DataInput in) throws IOException {

phoneNB=in.readUTF();
d_load=in.readLong();
u_load=in.readLong();
s_load=in.readLong();

}

@Override
public String toString() {
return ""+d_load+"\t"+u_load+"\t"+s_load;
}

@Override
public int compareTo(FlowBean arg0) {
// TODO Auto-generated method stub
return s_load>arg0.getS_load()?-1:1;
}

}


5.mapreduce程序代码如下:

package com.zhichao.wan.flowmr.sort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zhichao.wan.flowmr.FlowBean;

public class FlowSortMR {

public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();

String[] fileds = StringUtils.split(line,"\t");

String phoneNB=fileds[0];
long u_load=Long.parseLong(fileds[1]);
long d_load=Long.parseLong(fileds[2]);

context.write(new FlowBean(phoneNB, u_load, d_load), NullWritable.get());

}

}

public static class sortReducder extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {

String phoneNB=key.getPhoneNB();

context.write(new Text(phoneNB), new FlowBean(phoneNB, key.getU_load(), key.getD_load()));

}

}

public static void main(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowSortMR.class);

job.setMapperClass(SortMapper.class);
job.setReducerClass(sortReducder.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path("F:/hadoop/flow/output"));

FileOutputFormat.setOutputPath(job, new Path("F:/hadoop/flow/output1"));

job.waitForCompletion(true);

}

}


6.运行结果如下:

13726238888 2481    24681   27162
13726230503 2481    24681   27162
13925057413 63  11058   11121
18320173382 18  9531    9549
13502468823 102 7335    7437
13660577991 9   6960    6969
13922314466 3008    3720    6728
13560439658 5892    400 6292
84138413    4116    1432    5548
15013685858 27  3659    3686
15920133257 20  3156    3176
13602846565 12  1938    1950
15989002119 3   1938    1941
13926435656 1512    200 1712
18211575961 12  1527    1539
13560436666 954 200 1154
13480253104 180 200 380
13760778710 120 200 320
13826544101 0   200 200
13926251106 0   200 200
13719199419 0   200 200
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: