您的位置:首页 > 移动开发 > 微信开发

编写mapreduce统计数据流量的小程序

2016-12-13 16:51 531 查看
 主代码如下:package cn.itcast.bigdata.mr.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将一行内容转成stringString line = value.toString();//切分字段String[] fields = line.split("\t");//取出手机号String phoneNbr = fields[1];//取出上行流量下行流量long upFlow = Long.parseLong(fields[fields.length-3]);long dFlow = Long.parseLong(fields[fields.length-2]);context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));}}static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{//<183323,bean1><183323,bean2><183323,bean3><183323,bean4>.......@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long sum_upFlow = 0;long sum_dFlow = 0;//遍历所有bean,将其中的上行流量,下行流量分别累加for(FlowBean bean: values){sum_upFlow += bean.getUpFlow();sum_dFlow += bean.getdFlow();}FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);context.write(key, resultBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路径job.setJarByClass(FlowCount.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOut4000putValueClass(FlowBean.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}
FlowBean代码如下:
package cn.itcast.bigdata.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{

private long upFlow;
private long dFlow;
private long sumFlow;

//反序列化时,需要反射调用空参构造函数,所以要显示定义一个
public FlowBean(){}

public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}

public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);

}

/**
* 反序列化方法
* 注意:反序列化的顺序跟序列化的顺序完全一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dFlow = in.readLong();
sumFlow = in.readLong();
}

@Override
public String toString() {

return upFlow + "\t" + dFlow + "\t" + sumFlow;
}

}

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: