Hadoop MapReduce程序分析飞机航班信息源代码
2016-08-07 18:17
1101 查看
1: 数据来源:
基于美国民航航班1987年数据,开发MapReduce应用程序计算其中某一年各个航班的飞行数据。
数据式样
Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1987,10,15,4,729,730,903,849,PS,1451,NA,94,79,NA,14,-1,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA……
注意数据的以下字段:
第4列为星期值
第9列为航班代号
第10列为航班号
2: 代码实现目标
编写MapReduce应用程序,统计一个星期中每天的航班飞行架次,以及各航班飞行总里程,将结果保存在两个文本文件中。
3:实现代码
基于美国民航航班1987年数据,开发MapReduce应用程序计算其中某一年各个航班的飞行数据。
数据式样
Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA
1987,10,15,4,729,730,903,849,PS,1451,NA,94,79,NA,14,-1,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA……
注意数据的以下字段:
第4列为星期值
第9列为航班代号
第10列为航班号
2: 代码实现目标
编写MapReduce应用程序,统计一个星期中每天的航班飞行架次,以及各航班飞行总里程,将结果保存在两个文本文件中。
3:实现代码
package org.apache.flight; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class FlightWeekDist { // 分析航班的每周星期的航班次数 public static class FlightNumMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text dateofWeek = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); try { int year = Integer.parseInt(fields[0]); //filter first raw } catch (NumberFormatException e) { return;} dateofWeek.set(fields[3]); // date of week context.write(dateofWeek, one); } } // 分析航班中每一个航班每周的航程里数的map函数 public static class FlightMilesMapper extends Mapper<Object, Text, Text, IntWritable>{ private IntWritable Miles = new IntWritable(); private Text FlightNum = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); try { int year = Integer.parseInt(fields[0]); //filter first raw } catch (NumberFormatException e) { return;} String flight = fields[8]+fields[9]; FlightNum.set(flight); // class name int miles = 0; try { miles = Integer.parseInt(fields[18]); //filter first raw } catch (NumberFormatException e) { } Miles.set(miles); context.write(FlightNum, Miles); } } //Reduce函数 两个map公用 public static class FlightSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //如果文件夹存在,则删除文件夹 private static void removeOutputPath(Configuration conf, String output1, String output2) throws IOException { FileSystem hdfs = FileSystem.get(conf); Path path = new Path(output1); hdfs.delete(path, true); path = new Path(output2); hdfs.delete(path, true); } //创建航班数目的工作 private static Job createFlightNumJob(Configuration conf, String input, String output) throws IOException { Job job = new Job(conf, "Flight Numbers"); job.setJarByClass(FlightWeekDist.class); job.setMapperClass(FlightNumMapper.class); job.setCombinerClass(FlightSumReducer.class); job.setReducerClass(FlightSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job; } private static Job createFlightMilesJob(Configuration conf, String input, String output) throws IOException { Job job = new Job(conf, "Flight Milse"); job.setJarByClass(FlightWeekDist.class); job.setMapperClass(FlightMilesMapper.class); job.setCombinerClass(FlightSumReducer.class); job.setReducerClass(FlightSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); return job; } //主函数 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Usage: ScoreAnalysis <in> <out1> <out2>"); System.exit(2); } removeOutputPath(conf, otherArgs[1], otherArgs[2]); Job job = createFlightNumJob(conf, otherArgs[0], otherArgs[1]); job.waitForCompletion(true); job = createFlightMilesJob(conf, otherArgs[0], otherArgs[2]); job.waitForCompletion(true); } }
相关文章推荐
- Hadoop(十三)分析MapReduce程序
- Hadoop之Mapreduce------>入门级程序WordCount代码编写
- Linux巩固记录(5) hadoop 2.7.4下自己编译代码并运行MapReduce程序
- Hadoop(十三)分析MapReduce程序
- Hadoop之MapReduce程序分析
- Hadoop(十三)分析MapReduce程序【转载】
- 从程序角度分析mapreduce原理与代码
- Hadoop MapReduce基于新API的WordCount程序运行过程分析
- Hadoop-03-第二个MapReduce程序--模拟分析购物日志
- Hadoop-0.20.0源代码分析(01)
- Hadoop-0.20.0源代码分析(13)
- Hadoop-0.20.0源代码分析(10)
- Hadoop-0.20.0源代码分析(15)
- Hadoop-0.20.0源代码分析(14)
- Hadoop-0.20.0源代码分析(19)
- Hadoop-0.20.0源代码分析(08)
- Hadoop-0.20.0源代码分析(07)
- Hadoop-0.20.0源代码分析(17)
- Hadoop-0.20.0源代码分析(02)
- 关于从NAND Flash启动的问题,2440 启动问题 , 拷贝4k程序 ,启动代码分析