MapReduce的手机流量统计的案例
2016-05-15 11:15
441 查看
程序:(另外一个关于单词计数的总结:http://www.cnblogs.com/DreamDrive/p/5492572.html)
import java.io.IOException; import mapreduce.WordCountApp.WordCountMapper.WordCountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 以文本 * hello you * hello me * 为例子. * map方法调用了两次,因为有两行 * k2 v2 键值对的数量有几个? * 有4个.有四个单词. * * 会产生几个分组? * 产生3个分组. * 有3个不同的单词. * */ public class WordCountApp { public static void main(String[] args) throws Exception { //程序在这里运行,要有驱动. Configuration conf = new Configuration(); Job job = Job.getInstance(conf,WordCountApp.class.getSimpleName()); //我们运行此程序通过运行jar包来执行.一定要有这句话. job.setJarByClass(WordCountApp.class); FileInputFormat.setInputPaths(job,args[0]); job.setMapperClass(WordCountMapper.class);//设置Map类 job.setMapOutputKeyClass(Text.class);//设置Map的key job.setMapOutputValueClass(LongWritable.class);//设置Map的value job.setReducerClass(WordCountReducer.class);//设置Reduce的类 job.setOutputKeyClass(Text.class);//设置Reduce的key Reduce这个地方只有输出的参数可以设置. 方法名字也没有Reduce关键字区别于Map job.setOutputValueClass(LongWritable.class);//设置Reduce的value. FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);//表示结束了才退出,不结束不退出 } /** * 4个泛型的意识 * 第一个是LongWritable,固定就是这个类型,表示每一行单词的起始位置(单位是字节) * 第二个是Text,表示每一行的文本内容. * 第三个是Text,表示单词 * 第四个是LongWritable,表示单词的出现次数 */ public static class WordCountMapper extends Mapper<LongWritable, Text, Text ,LongWritable>{ Text k2 = new Text(); LongWritable v2 = new LongWritable(); //增加一个计数器,这个Map调用几次就输出对应的次数. int counter = 0; /** * key和value表示输入的信息 * 每一行文本调用一次map函数 */ @Override protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { counter = counter + 1; System.out.println("mapper 调用的次数:" + counter); //这个map方法中的Mapper的各个泛型和上面的意识是一样的,分别代表的是k1,v1,k2,v2 String line = value.toString(); System.out.println(String.format("<k1,v1>的值<"+key.get()+","+line+">")); String[] splited = line.split("\t"); for (String word : splited) { k2.set(word); v2.set(1); System.out.println(String.format("<k2,v2>的值<"+k2.toString()+","+v2.get()+">")); context.write(k2, v2);//通过context对象写出去. } } /** * 这个地方的四个泛型的意思 * 前两个泛型是对应的Map方法的后两个泛型. * Map的输出对应的是Reduce的输入. * 第一个Text是单词 * 第二个LongWritable是单词对应的次数 * 我们想输出的也是单词 和 次数 * 所以第三个和第四个的类型和第一和第二个的一样 * * 分组指的是把相同key2的value2放到一个集合中 * */ public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v3 = new LongWritable(); //增加一个计数器,这个Reduce调用几次就输出对应的次数. int counter = 0; /** * 每一个分组调用一次reduce函数 * 过来的k2 分别是hello you me * */ @Override protected void reduce(Text key2, Iterable<LongWritable> value2Iterable,Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { counter = counter + 1; System.out.println("reducer 调用的次数:" + counter); //第一个参数是单词,第二个是可迭代的集合. 为什么上面的LongWritable类型的对象value2变成了一个可以迭代的结合参数? //因为分组指的是把相同key2的value2放到一个集合中 long sum = 0L; for (LongWritable value2 : value2Iterable) { System.out.println(String.format("<k2,v2>的值<"+key2.toString()+","+value2.toString()+">")); sum += value2.get(); //这个value2是LongWritable类型的,不能进行+= 操作,要用get()得到其对应的java基本类型. //sum表示单词k2 在整个文本中的出现次数. } v3.set(sum); context.write(key2, v3); System.out.println(String.format("<k3,v3>的值<"+key2.toString()+","+v3.get()+">")); } } } }
三:查看结果
打包上传到Hadoop集群,然后执行命令运行.详细运行过程不再写了.........
//==============================================================================================
程序二:
/* * 一个hello文件内容如下: * hello you * hello me */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountApp { public static void main(String[] args) throws Exception { // 在main方法写驱动程序,把Map函数和Reduce函数组织在一起. // 搞一个对象把Map对象和Reduce对象都放在这个对象中,我们把这个对象称作Job // 两个形参,一个是Configuration对象,一个是Job的名称,这样获得了一个Job对象; Job job = Job.getInstance(new Configuration(), WordCountApp.class.getSimpleName()); // 对这个job进行设置 job.setJarByClass(WordCountApp.class);// 通过这个设置可以让框架识别你写的代码 job.setMapperClass(MyMapper.class);// 把自定义的Map类放到job中 job.setMapOutputKeyClass(Text.class);// 定义Map的key的输出类型,Map的输出是<hello,2> job.setMapOutputValueClass(LongWritable.class);// 定义Map的value的输出类型 job.setReducerClass(MyReducer.class);// 把自定义的Reducer类放到job中 job.setOutputKeyClass(Text.class);// 因为Reduce的输出是最终的数据,Reduce的输出是<hello,2> // 所以这个方法名中没有像Map对应的放发一样带有Reduce,直接就是setOutputKeyClass job.setOutputValueClass(LongWritable.class);// 定义reduce的value输出 FileInputFormat.setInputPaths(job, args[0]);// 输入指定:传入一个job地址. // 这个args[0] 就是新地址,"hdfs://192.168.0.170/hello" FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出指定 // 指定输入和输出路径可以通过在这里写死的方式,也可以通过main函数参数的形式 // 分别是args[0]和args[1] // 把job上传到yarn平台上. job.waitForCompletion(true); } /* * 对于<k1,v1>而言,每一行产生一个<k1,v1>对,<k1,v1>表示<行的起始位置,行的文本内容> * 就本例而言map函数总共调用两次,因为总共只有两行. * 正对要统计的文本内容可以知道总共两行,总共会调用两次Map函数对应产生的<k1,v1>分别是<0,hello you> * 和第二个<k1,v1>是<10,hello me> */ private static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { // 这个Mapper的泛型参数是<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 分别对应的是k1,v1,k2,v2 // 我们如下讲的k1,v1的类型是固定的. // 就本例而言,map函数会被调用2次,因为总共文本文件就只有两行. //要定义输出的k2和v2.本案例中可以分析出<k2,v2>是对文本内容的统计<hello,1><hello,1><you,1><me,1> //而且<k2,v2>的内容是和<k3,v3>中的内容是一样的. Text k2 = new Text(); LongWritable v2 = new LongWritable(); //重写父类Mapper中的map方法 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //通过代码或者案例分析就可以知道k1其实没有什么用出的. String line = value.toString(); String[] splited = line.split("\t");//根据制表分隔符机进行拆分.hello和me,you之间是一个制表分隔符. for (String word : splited) { k2.set(word); v2.set(1); context.write(k2, v2); //用context把k2,v2写出去,框架会写,不用我们去管. } } } private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { //这个例子中的<k2,v2>和<k3,v3>中的k是一样的,所以这里,k2当做k3了. LongWritable v3 = new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //Reduce是对上面Map中的结果进行汇总的. //上面拆分出来的<k2,v2>是<hello,1><hello,1><you,1><me,1>Reduce方法中就要对其进行汇总. long sum = 0L; for(LongWritable v2:v2s){ sum = sum +v2.get();//sum是long类型,v2是LongWritable类型 //LongWritable类型转换成long类型用get()方法. //sum的值表示单词在整个文件中出现的中次数. } v3.set(sum); context.write(k2,v3); } } }
相关文章推荐
- 推荐系统的重点、难点问题
- leetcode 344: Reverse String (c#版)
- cf 346 D E
- 一个Activity的显示过程总结(三)
- c++中的 trivial destructor
- 【转载】redis安装步骤
- PHP笔记③数据类型
- 利用NSURLProtocol实现webView缓存
- JBOSS配置+修改端口号+与Tomcat比较
- 【Leetcode】Duplicate Emails
- getchar() getch() getche() gets() puts() scanf()的用法及区别
- 数据库 续
- 设计模式学习笔记之策略模式
- Android的Selector与Shape
- XenAPP6.5安装于部署(五)---Citrix AppCenter配置,发布应用
- AS使用备忘录
- 基于MATLAB的音频信号处理技术实现
- UML学习之初步总结
- hdu-5681 zxa and wifi(dp)
- RadioButton和CheckBox