您的位置:首页 > 运维架构

hadoop2.x入门:编写mapreduce对气象数据集求每日最高气温和最低气温

2017-07-23 19:30 441 查看

1.下载气象数据集

气象数据集下载地址为:

- ftp://ftp.ncdc.noaa.gov/pub/data/noaa

我们下载国内的气象数据,使用下面命令进行下载

wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*[/code] 
国内气象站ID区间为
50001-59998
详细的可以在《1951—2007年中国地面气候资料日值数据集台站信息》中查看,不过应该不全。另外《StationIDs_Global_1509》中提供了世界各国气象站编号范围。

2.解压数据集,并保存在文本文件中

7月23号下载的,数据量为79w行,大小为182MB。所以即使年底也不过200w行。

[grid@tiny01 ~]$ zcat data/ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*.gz > data.txt


在这里>表示输出重定向符

我们查看气象数据集:

0169501360999992017010100004+52130+122520FM-12+043399999V0202201N0010102600199003700199-03271-03631102641ADDAA124160092AJ199999999999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF108991081999026001999999MA1999999096571MD1210041+0301REMSYN004BUFR


对数据格式进行解释

1-4     0169
5-10    501360      # USAF weather station identifier
11-15   99999       # WBAN weather station identifier
<
4000
span class="hljs-number">16-23   20170101    # 记录日期
24-27   0000        # 记录时间
28      4
29-34   +52130      # 纬度(1000倍)
35-41   +122520     # 经度(1000倍)
42-46   FM-12
47-51   +0433      # 海拔(米)
52-56   99999
57-60   V020
61-63   220         # 风向
64      1           # 质量代码
65      N
66-69   0010
70      1
71-75   02600       # 云高(米)
76      1
77      9
78      9
79-84   003700      # 能见距离(米)
85      1
86      9
87      9
88-92   -0327       # 空气温度(摄氏度*10)
93      1
94-98   -0363       # 露点温度(摄氏度*10)
99      1
100-104 10264       # 大气压力
105     1


其中第5-10位表示气象站编号:501360(取前五位),查表可得对应的是黑龙江漠河。我们主要分析的是月份:16-21位和空气温度:88-92位的极值关系。

3. 编写MapReduce程序

Mapper程序

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String data = line.substring(15, 21);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(data), new IntWritable(airTemperature));
}
}
}


Reducer程序

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}


M-R job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

public class MaxTemperature extends Configured implements Tool {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.set("mapred.jar", "MaxTemperature.jar");
Job job = Job.getInstance(conf);
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
return 0;
}
}


注意设置
conf.set("mapred.jar", "MaxTemperature.jar");
第二个参数为即将打成的jar包的名称


4.编译java文件,打成jar包

此编译命令为:

[grid@tiny01 myclass]$ javac -classpath $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.2.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar *.java
[grid@tiny01 myclass]$ jar cvf MaxTemperature.jar *.class
[grid@tiny01 myclass]$ ll
total 28
-rw-rw-r--. 1 grid grid 1413 Jul  3 16:45 MaxTemperature.class
-rw-rw-r--. 1 grid grid 3085 Jul  9 19:04 MaxTemperature.jar
-rw-rw-r--. 1 grid grid  949 Jun 30 15:49 MaxTemperature.java
-rw-rw-r--. 1 grid grid 1876 Jul  3 16:45 MaxTemperatureMapper.class
-rw-rw-r--. 1 grid grid  953 Jun 30 15:37 MaxTemperatureMapper.java
-rw-rw-r--. 1 grid grid 1687 Jul  3 16:45 MaxTemperatureReducer.class
-rw-rw-r--. 1 grid grid  553 Jun 30 15:47 MaxTemperatureReducer.java


这里的classpath和之前的hadoop版本有所区别,需要按照新的设置方法,这一点网上很少提及!(注意Hadoop不同版本,包不一样)

5.将数据上传至hdfs上

[grid@tiny01 ~]$ hadoop fs -put data.txt /data.txt


6. 运行该程序

[grid@tiny01 ~]$ hadoop jar MaxTemperature.jar MaxTemperature /data.txt /out

17/07/24 00:13:20 INFO client.RMProxy: Connecting to ResourceManager at tiny01/192.168.1.101:8032
17/07/24 00:13:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/07/24 00:13:22 INFO input.FileInputFormat: Total input paths to process : 1
17/07/24 00:13:23 INFO mapreduce.JobSubmitter: number of splits:2
17/07/24 00:13:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1500807860144_0002
17/07/24 00:13:24 INFO impl.YarnClientImpl: Submitted application application_1500807860144_0002
17/07/24 00:13:24 INFO mapreduce.Job: The url to track the job: http://tiny01:8088/proxy/application_1500807860144_0002/ 17/07/24 00:13:24 INFO mapreduce.Job: Running job: job_1500807860144_0002
17/07/24 00:13:44 INFO mapreduce.Job: Job job_1500807860144_0002 running in uber mode : false
17/07/24 00:13:44 INFO mapreduce.Job:  map 0% reduce 0%
17/07/24 00:14:49 INFO mapreduce.Job:  map 20% reduce 0%
17/07/24 00:14:52 INFO mapreduce.Job:  map 33% reduce 0%
17/07/24 00:14:55 INFO mapreduce.Job:  map 50% reduce 0%
17/07/24 00:16:02 INFO mapreduce.Job:  map 51% reduce 0%
17/07/24 00:16:05 INFO mapreduce.Job:  map 54% reduce 0%
17/07/24 00:16:08 INFO mapreduce.Job:  map 57% reduce 0%
17/07/24 00:16:11 INFO mapreduce.Job:  map 60% reduce 0%
17/07/24 00:16:14 INFO mapreduce.Job:  map 62% reduce 0%
17/07/24 00:16:20 INFO mapreduce.Job:  map 65% reduce 0%
17/07/24 00:16:40 INFO mapreduce.Job:  map 69% reduce 0%
17/07/24 00:16:42 INFO mapreduce.Job:  map 73% reduce 0%
17/07/24 00:16:44 INFO mapreduce.Job:  map 83% reduce 0%
17/07/24 00:16:46 INFO mapreduce.Job:  map 100% reduce 0%
17/07/24 00:17:22 INFO mapreduce.Job:  map 100% reduce 100%
17/07/24 00:17:30 INFO mapreduce.Job: Job job_1500807860144_0002 completed successfully
17/07/24 00:17:32 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=10226664
FILE: Number of bytes written=20805407
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=190690631
HDFS: Number of bytes written=77
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=1
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=383699
Total time spent by all reduces in occupied slots (ms)=143422
Total time spent by all map tasks (ms)=383699
Total time spent by all reduce tasks (ms)=143422
Total vcore-milliseconds taken by all map tasks=383699
Total vcore-milliseconds taken by all reduce tasks=143422
Total megabyte-milliseconds taken by all map tasks=392907776
Total megabyte-milliseconds taken by all reduce tasks=146864128
Map-Reduce Framework
Map input records=789998
Map output records=786666
Map output bytes=8653326
Map output materialized bytes=10226670
Input split bytes=184
Combine input records=0
Combine output records=0
Reduce input groups=7
Reduce shuffle bytes=10226670
Reduce input records=786666
Reduce output records=7
Spilled Records=1573332
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=2436
CPU time spent (ms)=8470
Physical memory (bytes) snapshot=415924224
Virtual memory (bytes) snapshot=6170849280
Total committed heap usage (bytes)=267198464
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=190690447
File Output Format Counters
Bytes Written=77


查看结果

[grid@tiny01 ~]$ hadoop fs -cat /out/part-r-00000
201701  307
201702  350
201703  375
201704  399
201705  426
201706  444
201707  485


由于这里的气温是摄氏度的10倍,所以看起来很大。

我们来检查一下:

[grid@tiny01 ~]$ hadoop fs -copyToLocal /out/part-r-00000 result.txt
[grid@tiny01 ~]$ awk '{print $1".{66}+0"$2"1"}' result.txt |xargs -i grep --color=auto {} sample.txt | awk -v FS="" '{print substr($0,5,5),substr($0,16,6),substr($0,88,6)}'
59158 201704 +03071
59997 201701 +03071
56966 201702 +03501
56966 201703 +03751
56966 201704 +03991
51573 201705 +04261
51573 201706 +04441
51573 201706 +04441
51573 201707 +04851


正则表达式不会写,就将就着看吧,第一条是因为正则表达式匹配的问题,因此这条数据不算。但是其他条都吻合,我们可以看看这几个气象站:

51573:新疆吐鲁番

56966:云南元江

59997:没找到

测试成功!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: