hadoop2.x入门:编写mapreduce对气象数据集求每日最高气温和最低气温
2017-07-23 19:30
441 查看
1.下载气象数据集
气象数据集下载地址为:- ftp://ftp.ncdc.noaa.gov/pub/data/noaa
我们下载国内的气象数据,使用下面命令进行下载
wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*[/code]
国内气象站ID区间为50001-59998详细的可以在《1951—2007年中国地面气候资料日值数据集台站信息》中查看,不过应该不全。另外《StationIDs_Global_1509》中提供了世界各国气象站编号范围。2.解压数据集,并保存在文本文件中
7月23号下载的,数据量为79w行,大小为182MB。所以即使年底也不过200w行。[grid@tiny01 ~]$ zcat data/ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*.gz > data.txt
在这里>表示输出重定向符
我们查看气象数据集:0169501360999992017010100004+52130+122520FM-12+043399999V0202201N0010102600199003700199-03271-03631102641ADDAA124160092AJ199999999999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL +99999+99999GF108991081999026001999999MA1999999096571MD1210041+0301REMSYN004BUFR
对数据格式进行解释1-4 0169 5-10 501360 # USAF weather station identifier 11-15 99999 # WBAN weather station identifier < 4000 span class="hljs-number">16-23 20170101 # 记录日期 24-27 0000 # 记录时间 28 4 29-34 +52130 # 纬度(1000倍) 35-41 +122520 # 经度(1000倍) 42-46 FM-12 47-51 +0433 # 海拔(米) 52-56 99999 57-60 V020 61-63 220 # 风向 64 1 # 质量代码 65 N 66-69 0010 70 1 71-75 02600 # 云高(米) 76 1 77 9 78 9 79-84 003700 # 能见距离(米) 85 1 86 9 87 9 88-92 -0327 # 空气温度(摄氏度*10) 93 1 94-98 -0363 # 露点温度(摄氏度*10) 99 1 100-104 10264 # 大气压力 105 1
其中第5-10位表示气象站编号:501360(取前五位),查表可得对应的是黑龙江漠河。我们主要分析的是月份:16-21位和空气温度:88-92位的极值关系。3. 编写MapReduce程序
Mapper程序import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String data = line.substring(15, 21); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus // signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(data), new IntWritable(airTemperature)); } } }
Reducer程序import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
M-R jobimport org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; public class MaxTemperature extends Configured implements Tool { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err .println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); conf.set("mapred.jar", "MaxTemperature.jar"); Job job = Job.getInstance(conf); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub return 0; } }
注意设置conf.set("mapred.jar", "MaxTemperature.jar");第二个参数为即将打成的jar包的名称4.编译java文件,打成jar包
此编译命令为:[grid@tiny01 myclass]$ javac -classpath $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.2.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar *.java [grid@tiny01 myclass]$ jar cvf MaxTemperature.jar *.class [grid@tiny01 myclass]$ ll total 28 -rw-rw-r--. 1 grid grid 1413 Jul 3 16:45 MaxTemperature.class -rw-rw-r--. 1 grid grid 3085 Jul 9 19:04 MaxTemperature.jar -rw-rw-r--. 1 grid grid 949 Jun 30 15:49 MaxTemperature.java -rw-rw-r--. 1 grid grid 1876 Jul 3 16:45 MaxTemperatureMapper.class -rw-rw-r--. 1 grid grid 953 Jun 30 15:37 MaxTemperatureMapper.java -rw-rw-r--. 1 grid grid 1687 Jul 3 16:45 MaxTemperatureReducer.class -rw-rw-r--. 1 grid grid 553 Jun 30 15:47 MaxTemperatureReducer.java
这里的classpath和之前的hadoop版本有所区别,需要按照新的设置方法,这一点网上很少提及!(注意Hadoop不同版本,包不一样)5.将数据上传至hdfs上
[grid@tiny01 ~]$ hadoop fs -put data.txt /data.txt6. 运行该程序
[grid@tiny01 ~]$ hadoop jar MaxTemperature.jar MaxTemperature /data.txt /out 17/07/24 00:13:20 INFO client.RMProxy: Connecting to ResourceManager at tiny01/192.168.1.101:8032 17/07/24 00:13:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/07/24 00:13:22 INFO input.FileInputFormat: Total input paths to process : 1 17/07/24 00:13:23 INFO mapreduce.JobSubmitter: number of splits:2 17/07/24 00:13:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1500807860144_0002 17/07/24 00:13:24 INFO impl.YarnClientImpl: Submitted application application_1500807860144_0002 17/07/24 00:13:24 INFO mapreduce.Job: The url to track the job: http://tiny01:8088/proxy/application_1500807860144_0002/ 17/07/24 00:13:24 INFO mapreduce.Job: Running job: job_1500807860144_0002 17/07/24 00:13:44 INFO mapreduce.Job: Job job_1500807860144_0002 running in uber mode : false 17/07/24 00:13:44 INFO mapreduce.Job: map 0% reduce 0% 17/07/24 00:14:49 INFO mapreduce.Job: map 20% reduce 0% 17/07/24 00:14:52 INFO mapreduce.Job: map 33% reduce 0% 17/07/24 00:14:55 INFO mapreduce.Job: map 50% reduce 0% 17/07/24 00:16:02 INFO mapreduce.Job: map 51% reduce 0% 17/07/24 00:16:05 INFO mapreduce.Job: map 54% reduce 0% 17/07/24 00:16:08 INFO mapreduce.Job: map 57% reduce 0% 17/07/24 00:16:11 INFO mapreduce.Job: map 60% reduce 0% 17/07/24 00:16:14 INFO mapreduce.Job: map 62% reduce 0% 17/07/24 00:16:20 INFO mapreduce.Job: map 65% reduce 0% 17/07/24 00:16:40 INFO mapreduce.Job: map 69% reduce 0% 17/07/24 00:16:42 INFO mapreduce.Job: map 73% reduce 0% 17/07/24 00:16:44 INFO mapreduce.Job: map 83% reduce 0% 17/07/24 00:16:46 INFO mapreduce.Job: map 100% reduce 0% 17/07/24 00:17:22 INFO mapreduce.Job: map 100% reduce 100% 17/07/24 00:17:30 INFO mapreduce.Job: Job job_1500807860144_0002 completed successfully 17/07/24 00:17:32 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=10226664 FILE: Number of bytes written=20805407 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=190690631 HDFS: Number of bytes written=77 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Killed map tasks=1 Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=383699 Total time spent by all reduces in occupied slots (ms)=143422 Total time spent by all map tasks (ms)=383699 Total time spent by all reduce tasks (ms)=143422 Total vcore-milliseconds taken by all map tasks=383699 Total vcore-milliseconds taken by all reduce tasks=143422 Total megabyte-milliseconds taken by all map tasks=392907776 Total megabyte-milliseconds taken by all reduce tasks=146864128 Map-Reduce Framework Map input records=789998 Map output records=786666 Map output bytes=8653326 Map output materialized bytes=10226670 Input split bytes=184 Combine input records=0 Combine output records=0 Reduce input groups=7 Reduce shuffle bytes=10226670 Reduce input records=786666 Reduce output records=7 Spilled Records=1573332 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=2436 CPU time spent (ms)=8470 Physical memory (bytes) snapshot=415924224 Virtual memory (bytes) snapshot=6170849280 Total committed heap usage (bytes)=267198464 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=190690447 File Output Format Counters Bytes Written=77
查看结果[grid@tiny01 ~]$ hadoop fs -cat /out/part-r-00000 201701 307 201702 350 201703 375 201704 399 201705 426 201706 444 201707 485
由于这里的气温是摄氏度的10倍,所以看起来很大。
我们来检查一下:[grid@tiny01 ~]$ hadoop fs -copyToLocal /out/part-r-00000 result.txt [grid@tiny01 ~]$ awk '{print $1".{66}+0"$2"1"}' result.txt |xargs -i grep --color=auto {} sample.txt | awk -v FS="" '{print substr($0,5,5),substr($0,16,6),substr($0,88,6)}' 59158 201704 +03071 59997 201701 +03071 56966 201702 +03501 56966 201703 +03751 56966 201704 +03991 51573 201705 +04261 51573 201706 +04441 51573 201706 +04441 51573 201707 +04851
正则表达式不会写,就将就着看吧,第一条是因为正则表达式匹配的问题,因此这条数据不算。但是其他条都吻合,我们可以看看这几个气象站:
51573:新疆吐鲁番
56966:云南元江
59997:没找到
测试成功!
相关文章推荐
- 企业级Hadoop 2.x入门系列之十MapReduce 程序相关日志
- Hadoop之Avro mapreduce最高气温程序
- hadoop2.x入门:在windows上用Eclipse编写程序
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)
- Hadoop入门第三篇-MapReduce试手以及MR工作机制
- 如何在Hadoop上编写MapReduce程序
- hadoop入门--使用MapReduce统计每个航班班次
- Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 用Python编写Hadoop下MapReduce程序
- Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)
- 用PHP编写Hadoop的MapReduce程序
- Hadoop教程(三):HDFS、MapReduce、程序入门实践
- hadoop平台使用python编写mapreduce排序小程序
- win8下用Maven构建hadoop环境编写MapReduce程序
- hadoop mapreduce 计算平均气温的代码,绝对原创
- Hadoop 2.X 从入门到精通系列视频课程套餐
- hadoop入门--简单的MapReduce案例
- Hadoop MapReduce入门