初级案例
2015-11-09 21:16
295 查看
Hadoop MapReduce 手机流量统计
1)分析业务需求:
用户使用手机上网,存在流量的消耗。流量包括两部分:其一是上行流量(发送),其二是下行流量(接收)。每种流量在网络传输过程中,有两种形式说明:包的大小,流量的大小。使用手机上网,以手机号为唯一标识进行记录。这个记录包括很多信息,需要的信息字段:实际需要的字段:
手机号码,上行数据包数,下行数据包数,上行总流量,下行总流量
2)自定义数据类型(五个字段)
DataWritable 实现 WritableComparable 接口。3)分析MapReduce写法
分析MapReduce写法,哪些业务逻辑在Map阶段执行,那些业务逻辑在Reduce阶段执行。Map阶段:从文件中获取数据,抽取需要的五个字段,输出的key为手机号码,输出的value为数据流量的类型DataWritable对象。
Reduce阶段:将相同手机号码的value中的数据流量进行相加,得出手机流量的总数(数据包和数据流量)。输出到文件中,以制表符分开。
[code]package org.apache.hadoop.mapreduce.app; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DataTotalMapReduce { // Mapper Class static class DataTotalMapper extends Mapper<LongWritable, Text, Text, DataWritable> { private Text mapOutputKey = new Text(); private DataWritable dataWritable = new DataWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); // 数据分割 split String[] strs = lineValue.split("\t"); // 从strs中获取我们需要的几个字段 String phoneNum = strs[1]; int upPackNum = Integer.valueOf(strs[6]); int downPackNum = Integer.valueOf(strs[7]); int upPayLoad = Integer.valueOf(strs[8]); int downPayLoad = Integer.valueOf(strs[9]); // 设置map输出的key/value mapOutputKey.set(phoneNum); dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad); // 设置map输出 context.write(mapOutputKey, dataWritable); } } // Reducer Class static class DataTotalReducer extends Reducer<Text, DataWritable, Text, DataWritable> { private DataWritable dataWritable = new DataWritable(); public void reduce(Text key, Iterable<DataWritable> values, Context context) throws IOException, InterruptedException { int upPackNum = 0; int downPackNum = 0; int upPayLoad = 0; int downPayLoad = 0; // 循环 for (DataWritable data : values) { upPackNum += data.getUpPackNum(); downPackNum += data.getDownPackNum(); upPayLoad += data.getUpPayLoad(); downPayLoad += data.getDownPayLoad(); } // 设置输出 dataWritable.set(upPackNum, upPayLoad, downPackNum, downPayLoad); // 设置reduce/job输出 context.write(key, dataWritable); } } // Driver Code public int run(String[] args) throws Exception { // get conf Configuration conf = new Configuration(); // create job Job job = new Job(conf, DataTotalMapReduce.class.getSimpleName()); // set job job.setJarByClass(DataWritable.class); // 1)input Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); // 2)map job.setMapperClass(DataTotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataWritable.class); // 3)reduce job.setReducerClass(DataTotalReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataWritable.class); // 4)output Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); // submit job boolean isSuccess = job.waitForCompletion(true); // return status return isSuccess ? 0 : 1; } // run mapreduce public static void main(String[] args) throws Exception { // set args args = new String[] { // input path // output path }; // run job int status = new DataTotalMapReduce().run(args); // exit System.exit(status); } }
相关文章推荐
- JAVA反射系列之Method,java.lang.reflect.Method的使用
- Python/scikit-learn机器学习库(特征选取)
- Angular JS 学习 -- 服务Service
- 大道至简第六章读后感
- LightOJ 1294 - Positive Negative Sign (规律)
- SOJ 3300_Stockholm Coins
- SOJ 3300_Stockholm Coins
- JAVA中使用AES加密
- 搭建Erlang语言开发环境(文本编辑器+Erlang OTP,不是绑定IDE的环境)
- ionic tab选项卡置于顶部
- css布局 : 居中问题
- 课堂作业
- CI
- 二叉树的镜像
- 面向对象程序设计(一)—— 创建型模式之抽象工厂模式
- 如何查看 oracle 官方文档
- [转]程序员练手小项目
- 把ListView和GridView与AppBar实现上下联动
- 数据结构实验之二叉树一:树的同构
- 链队列的算法操作