hadoop2.5.2学习09--MR之统计每月最高三个温度
2017-02-07 22:02
567 查看
1、 本案例的需求
数据为每5秒统计一次的温度数据,我需要求取每个月份的最高三个温度数据类型格式:
2、解决思路
设定多个reduce每年的数据都很多,如果按照默认情况处理,统计性能是非常慢(因为默认只有一个reduce),所以我们需要重新分配reduceTask,将一年的数据交给一个reduceTask处理,
分区
那个数据交给哪个reduceTask处理是有Patitioner决定(patition对每个map输出的数据分配置一个分区号这个分区号决定map输出数据送到那个reudeTask),
自定义分区
由于我们是将一年的数据交给一个reduce处理,但是默认分区是按照key.hashCode()的值 模 reduceTask数量得到分区号,所以我们需要重写分区,
自定义排序
由于我们是要每月最该的三个温度,所以需要对温度进行排序,所以在洗牌(shuffler)过程中自定义sort,
自定义分组
分组的目的:是按照实际的需求,将数据分成一个个组, 传给reduceTask,我们的需求是统计每年每月温度最高的三个,如果一组数据就是这一年的数据,我们对着一年的数据进行统计,是很复杂的,如果我们将每月的数据分成一个组,这样就会方便多了, 默认的分组是按照key是否相同进行分组,所以我们要自定义分组
自定义key
默认的partition是根据key的hashcode模reduceTask数量,得到分区号
默认的排序是根据key的字典排序
默认的分组是根据key相同,进行比较进行分组
这几个都与key与联系, 所以我们需要影响这些步骤的因素添加到key中,
根据上面分析,partition与年有关,sort与温度有关,分组和月份有关
总结:所以key中需要包含year, month, T
3、程序流程
3.1、自定义key
在上面分析, key包含year, month, T,在map输出和reduce的输入,都会发生溢写数据到disk上, map到reduce可能跨网络传输,所以对key进行序列化(持久化)与反序列化。
由于分组需要判断key是否是同一个对象, 所以需要重写equals();
在hadoop是序列化和反序列化,需要实现Writable,同时比价是否是同一个对象,需要实现Comparable。所以hadoop中提供一个接口WritableComparable,提供我们所需的。
下面为代码:
package com.chb.myWeather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** 自定义key: 包含 year,month, T map输出,和reduce的输入都会将数据溢写道磁盘, 而map到reduce 会跨网络传输,所以需要持久化(序列化) / public class MyKey implements WritableComparable<MyKey>{ private int year; private int month; private double t; ... getter/setter... ... //反序列化 @Override public void readFields(DataInput arg0) throws IOException { this.year = arg0.readInt(); this.month = arg0.readInt(); this.t = arg0.readDouble(); } //序列化 @Override public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(t); } /** 判断是否为同一个对象, 该对象最为map输出的key / public int compareTo(MyKey o) { int c1 = Integer.compare(this.getYear(), o.getYear()); if (c1 == 0) { int c2 = Integer.compare(this.getMonth(), o.getMonth()); if (c2 == 0){ return Double.compare(this.getT(), o.getT()); }else { return c2; } }else { return c1; } } }
3.2、Mapper
输入:默认的map输入的是一行行的数据, 按照每行数据字母的序号为键(LongWritable), 行数据为值(Text)
按照默认情况,我们需要对每行需要进行切割
1925-11-23 15:23:33 23c 时间YYYY-MM-dd空格HH:mm:ss制表符温度
我们自定自定义输入的key和value, 按照制表符切割每行数据
左边为key, 右边为value
那么我们不需要进行切割
输出:
mapper的输出是使用自定key,
因为输出key中包含year,month,T我们所需的数据, 所以输出的值为空
下面为Mapper
package com.chb.myWeather; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<Text, Text, MyKey, NullWritable> { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { try { Date date = sdf.parse(key.toString()); Calendar c = Calendar.getInstance(); c.setTime(date); int year = c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); //获取温度 Double t = Double.parseDouble(value.toString().substring(0, value.toString().length()-1)); MyKey myKey = new MyKey(); myKey.setYear(year); myKey.setMonth(month); myKey.setT(t); //输出 context.write(myKey, null); } catch (Exception e) { e.printStackTrace(); } } }
3.3、Shuffler的第一步Partitioner
默认的partitio是根据key的hashcode模reduceTask的数据,现在我们需要将一年的数据交给一个reduce处理, 所以需要按照年份分对应的reduceTask。这些reduceTask可以并行计算,性能就提高了,一年一个分区, 我们数据有多少年这是已知的, 对应reduceTasknums
这个分区号与year关联, 通过(yearm-最小年份)%reduceTasknums
partitiner的输出和map的输出一致
由于每个map的输出数据都会执行一次getPartition(), 数据量太大,所以该方法不能设置太复杂。
下面为自定Partitioner
package com.chb.myWeather; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartitioner extends HashPartitioner<MyKey, NullWritable>{ @Override public int getPartition(MyKey key, NullWritable value, int numReduceTasks) { return (key.getYear()-1949)%numReduceTasks; } }
3.4、shuffler中的Sort
默认情况下, shuffle中的sort是按照key的字典排序,而我们需要求一个月的前三个温度,这势必会对温度排序,所以我们自定义排序,按照温度的排序,输出的结果就是按照温度排序, 这样我们不用再对温度进行排序, 直接去reduce每组数据的输出数据的前三个。Sort代码:
package com.chb.myWeather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MySort extends WritableComparator{ //将自定义的key类加载进行 public MySort() { super(MyKey.class, true); } /** * 比较两个对象的温度 */ @Override public int compare(WritableComparable a, WritableComparable b) { MyKey k1 = (MyKey)a; MyKey k2 = (MyKey)b; int r1 = Integer.compare(k1.getYear(), k2.getYear()); if (r1 == 0 ) { int r2 = Integer.compare(k2.getMonth(), k2.getMonth()); if (r2 == 0) { //前面加一个- 。因为我们是按照温度的降序排序 return -Double.compare(k1.getT(),k2.getT()); }else { return r2; } }else { return r1; } } }
3.5、自定义分组
默认的分区是按照key十分相同,进行分组,如果使用默认分组, 那么就死将一年数据作为一个分组,情况麻烦, 所以我们将按照月份分组,每组输出前三个即可。分组代码:
package com.chb.myWeather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义分组, * 默认的分组是按照key是否相同, */ public class MyGroup extends WritableComparator{ public MyGroup() { super(MyKey.class, true); } /** * 由于分组是按照月份分组,所以在比较是只要比较月份是否相同, * 其他略去,做数据过滤 */ public int compare(WritableComparable a, WritableComparable b) { MyKey k1 = (MyKey)a; MyKey k2 = (MyKey)b; int r1 = Integer.compare(k1.getYear(), k2.getYear()); if (r1 == 0) { return Integer.compare(k1.getMonth(), k2.getMonth()); }else { return r1; } } }
3.6、ReudceTask
package com.chb.myWeather; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{ @Override protected void reduce(MyKey key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { //由于分组是按照月份,自定义排序是按照温度降序 //所以shuffler执行完的数据,是一个月数据,按照温度的降序排序, //所以我们只要获取前三个值,就是一个月份的最高三个温度 int num = 0; for (DoubleWritable dw : values) { String msg = key.getYear()+"年"+key.getMonth()+"月"+"\t"+dw.toString(); //写数据 context.write(new Text(msg), null); if (num==3) { break; } } } }
3.7、执行任务类
代码:package com.chb.myWeather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; public class RunJob { public static void main(String[] args) { Configuration conf = new Configuration(); try { FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(); job.setJarByClass(RunJob.class); job.setJar(""); //设置自定义 job.setMapperClass(MyMapper.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); job.setReducerClass(MyReducer.class); //设置Mappper的键值对类型 job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); //设置reduce数据量 job.setNumReduceTasks(3); //设置输入格式类型 job.setInputFormatClass(KeyValueTextInputFormat.class); //设置输入路径 FileInputFormat.addInputPath(job, new Path("/user/chb/input/weather")); //设置输出 Path out = new Path("/user/chb/output/weather"); if (fs.exists(out)) { fs.delete(out); } boolean f = job.waitForCompletion(true); if (f) { System.out.println("任务执行完成。。。"); } } catch (Exception e) { e.printStackTrace(); } } }
相关文章推荐
- hadoop2.5.2学习10--MR之统计每月最高三个温度01
- hadoop2.5.2学习10--MR之统计每月最高三个温度02
- hadoop2.5.2学习12-MR之PageRank01
- hadoop2.5.2学习14--MR之协同过滤天猫推荐
- hadoop2.5.2学习11-MR之好友推荐1
- hadoop2.5.2学习12-MR之PageRank
- hadoop2.5.2学习13-MR之引入第三方Jar
- Hadoop2.5.2学习02--MR执行环境的类型
- hadoop2.5.2学习13-MR之新浪微博TF-IDF算法简介
- hadoop - hadoop2.6 分布式 - 简单实例学习 - 统计某年的最高温度和按年份将温度从高到底排序
- hadoop2.5.2学习12-MR之PageRank02
- hadoop2.5.2学习11-MR之好友推荐2
- Hadoop2.5.2学习01--mapreduce统计单词数
- hadoop2.5.2学习13-MR之新浪微博-DF的实现
- hadoop2.5.2学习及实践笔记(五)—— HDFS shell命令行常见操作
- hadoop MR 统计分析日志脚本一例
- hadoop生态系统学习之路(二)如何编写MR以及运行测试
- hadoop学习之路(二)hadoop基本概念原理以及单词统计任务源码分析
- hadoop学习笔记(三)——WIN7+eclipse+hadoop2.5.2部署
- hadoop2.6温度统计mapreduce程序