编写mapreduce统计数据流量的小程序
2016-12-13 16:51
531 查看
主代码如下:package cn.itcast.bigdata.mr.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将一行内容转成stringString line = value.toString();//切分字段String[] fields = line.split("\t");//取出手机号String phoneNbr = fields[1];//取出上行流量下行流量long upFlow = Long.parseLong(fields[fields.length-3]);long dFlow = Long.parseLong(fields[fields.length-2]);context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));}}static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{//<183323,bean1><183323,bean2><183323,bean3><183323,bean4>.......@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long sum_upFlow = 0;long sum_dFlow = 0;//遍历所有bean,将其中的上行流量,下行流量分别累加for(FlowBean bean: values){sum_upFlow += bean.getUpFlow();sum_dFlow += bean.getdFlow();}FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);context.write(key, resultBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路径job.setJarByClass(FlowCount.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOut4000putValueClass(FlowBean.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}
FlowBean代码如下:
package cn.itcast.bigdata.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable{ private long upFlow; private long dFlow; private long sumFlow; //反序列化时,需要反射调用空参构造函数,所以要显示定义一个 public FlowBean(){} public FlowBean(long upFlow, long dFlow) { this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } /** * 反序列化方法 * 注意:反序列化的顺序跟序列化的顺序完全一致 */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } }
相关文章推荐
- Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(一)
- Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(二)
- 第二个MapReduce程序----flowcount(流量统计,自定义排序,自定义分区)
- 使用MapReduce计算框架统计CDN日志IP数、流量等数据
- 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序
- 编写mapreduce程序实例——数据去重
- 用jfreechat编写图形化统计数据程序(实例)
- 用mapreduce计算wordCount和手机流量统计程序运行过程
- 用jfreechat编写图形化统计数据程序(实例)
- Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序(三)
- 在hadoop上进行编写mapreduce程序,统计关键词在text出现次数
- 用jfreechat编写图形化统计数据程序
- Hadoop2.4.1 简单的用户手机流量统计的MapReduce程序总结
- 在hadoop上进行编写mapreduce程序,统计关键词在text出现次数
- MapReduce之一——上网流量数据统计
- 用jfreechat编写图形化统计数据程序(实例)
- 编写第一个MapReduce程序—— 统计气温
- 商店销售统计,每天有一个折扣价格,一次购10件以上者可以享受9.8折优惠,已知三个销售员的销售情况,运用静态数据成员和静态成员函数编写程序
- Linux下使用Eclipse编写MapReduce程序的配置