MapReduce之气温计算
2017-05-27 11:27
190 查看
Step 1:
导入Hadoop和Mapreduce的所有jar包
Step 2:WeatherMapper
Step 3:Shffle阶段自定义Group
Step 4:shuffle阶段自定义key
Step 5:shuffle阶段自定义partition
Step 6:shuffle阶段自定义排序
Step 7:shuffle阶段自定义combiner
Step 8:weather之reducer阶段
Step 9:MapReduce之Main—-RunJob
导入Hadoop和Mapreduce的所有jar包
Step 2:WeatherMapper
public class WeatherMapper extends Mapper<Text, Text, MyKey, Text>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { try { // 1950-10-01 12:21:02 37c SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date =sdf.parse(key.toString()); Calendar c =Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); int temp =Integer.parseInt(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey mykey = new MyKey(); mykey.setYear(year); mykey.setMonth(month); mykey.setTemp(temp); context.write(mykey, value); } catch (Exception e) { e.printStackTrace(); } } }
Step 3:Shffle阶段自定义Group
public class MyGroup extends WritableComparator{ @Override public int compare(WritableComparable a, WritableComparable b) { MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ return r1; } } public MyGroup() { super(MyKey.class,true); } }
Step 4:shuffle阶段自定义key
public class MyKey implements WritableComparable<MyKey>{ private int year; private int month; private int temp; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.month = in.readInt(); this.temp = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(temp); } @Override public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear()); if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.temp, o.getTemp()); }else{ return r2; } }else{ return r1; } } }
Step 5:shuffle阶段自定义partition
public class MyPartitioner extends HashPartitioner<MyKey, Text>{ @Override public int getPartition(MyKey key, Text value, int numReduceTasks) { return (key.getYear()-1949) % numReduceTasks; } }
Step 6:shuffle阶段自定义排序
public class MySort extends WritableComparator{ public MySort() { super(MyKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ int r2 =Integer.compare(k1.getMonth(), k2.getMonth()); if(r2==0){ return -Double.compare(k1.getTemp(), k2.getTemp()); }else{ return r2; } }else{ return r1; } } }
Step 7:shuffle阶段自定义combiner
public class WeatherCombiner extends Reducer<MyKey, Text, MyKey, Text>{ @Override protected void reduce(MyKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int i = 0; for(Text value : values){ context.write(key, value); if(i==3){ break; } i++; } } }
Step 8:weather之reducer阶段
public class WeatherReducer extends Reducer<MyKey, Text, Text, NullWritable>{ @Override protected void reduce(MyKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int i = 0; for(Text value : values){ context.write(new Text(key.getYear()+"\t"+key.getMonth()+"\t"+value), NullWritable.get()); if(i==3){ break; } i++; } } }
Step 9:MapReduce之Main—-RunJob
public class RunJob { public static void main(String[] args) { try { int numReduceTasks = Integer.valueOf(args[0]); Configuration conf = new Configuration(); FileSystem fs = FileSystem.newInstance(conf); Job job = Job.getInstance(); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); // job.setCombinerClass(WeatherCombiner.class); job.setNumReduceTasks(numReduceTasks); job.setGroupingComparatorClass(MyGroup.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); // job.setCombinerClass(cls); FileInputFormat.addInputPath(job, new Path("/data/weather")); Path output = new Path("/weather"); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); boolean flag = job.waitForCompletion(true); if(flag){ System.out.println("Job finished !"); } } catch (Exception e) { e.printStackTrace(); } } }
相关文章推荐
- mapreduce实例,计算最高气温
- hadoop mapreduce 计算平均气温的代码,绝对原创
- hadoop mapreduce 计算平均气温的代码,绝对原创
- Hadoop MapReduce计算框架
- Hadoop之Avro mapreduce最高气温程序
- 大数据图数据库之MapReduce用于图计算
- MapReduce端的二次排序以及对移动计算而不是移动数据的理解
- 3、 分布式计算模型MapReduce
- 用 MapReduce 解决与云计算相关的 Big Data 问题
- hadoop之魂--mapreduce计算框架,让收集的数据产生价值
- 云计算学习笔记005---Hadoop HDFS和MapReduce 架构浅析
- 用mapreduce计算wordCount和手机流量统计程序运行过程
- 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)
- MapReduce并行计算框架介绍
- 分布式计算框架Mapreduce
- mapreduce剖析气象站平均气温
- Java MapReduce 基本计算操作实现实战
- 1/10计算资源,1/3耗时,Spark颠覆MapReduce保持的排序记录
- 【MapReduce】经常使用计算模型具体解释
- 1/10计算资源,1/3耗时,Spark颠覆MapReduce保持的排序记录