Hadoop之Avro mapreduce最高气温程序
2017-07-12 15:53
429 查看
一、代码
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zqh.hadoopProject</groupId> <artifactId>hadoopProject</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2 </version> </dependency> </dependencies> <!--把所有引用的jar包打包到一起,需要使用maven-assembly-plugin这个maven插件--> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <!--这里要替换成jar包main方法所在类 --> <mainClass>hadoop.main.AvroGenericMaxTemperature</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <!--执行目标的标识符,用于标识构建过程中的目标,或者匹配继承过程中需要合并的执行目标 --> <id>make-assembly</id> <!-- this is used for inheritance merges --> <!--绑定了目标的构建生命周期阶段,如果省略,目标会被绑定到源数据里配置的默认阶段 --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <!--配置的执行目标 --> <goals> <goal>single</goal> <!--表示在执行package打包时,执行assembly:single--> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2. AvroGenericMaxTemperature.java
package hadoop.main;// cc AvroGenericMaxTemperature MapReduce program to find the maximum temperature, creating Avro output import hadoop.util.NcdcRecordParser; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.*; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; //vv AvroGenericMaxTemperature public class AvroGenericMaxTemperature extends Configured implements Tool { private static final Schema SCHEMA = new Schema.Parser().parse( "{" + " \"type\": \"record\"," + " \"name\": \"WeatherRecord\"," + " \"doc\": \"A weather reading.\"," + " \"fields\": [" + " {\"name\": \"year\", \"type\": \"int\"}," + " {\"name\": \"temperature\", \"type\": \"int\"}," + " {\"name\": \"stationId\", \"type\": \"string\"}" + " ]" + "}" ); public static class MaxTemperatureMapper extends AvroMapper<Utf8, Pair<Integer, GenericRecord>> { private NcdcRecordParser parser = new NcdcRecordParser(); private GenericRecord record = new GenericData.Record(SCHEMA); @Override public void map(Utf8 line, AvroCollector<Pair<Integer, GenericRecord>> collector, Reporter reporter) throws IOException { parser.parse(line.toString()); if (parser.isValidTemperature()) { record.put("year", parser.getYearInt()); record.put("temperature", parser.getAirTemperature()); record.put("stationId", parser.getStationId()); collector.collect( new Pair<Integer, GenericRecord>(parser.getYearInt(), record)); } } } public static class MaxTemperatureReducer extends AvroReducer<Integer, GenericRecord, GenericRecord> { @Override public void reduce(Integer key, Iterable<GenericRecord> values, AvroCollector<GenericRecord> collector, Reporter reporter) throws IOException { GenericRecord max = null; for (GenericRecord value : values) { if (max == null || (Integer) value.get("temperature") > (Integer) max.get("temperature")) { max = newWeatherRecord(value); } } collector.collect(max); } private GenericRecord newWeatherRecord(GenericRecord value) { GenericRecord record = new GenericData.Record(SCHEMA); record.put("year", value.get("year")); record.put("temperature", value.get("temperature")); record.put("stationId", value.get("stationId")); return record; } } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema.create(Schema.Type.INT), SCHEMA)); AvroJob.setOutputSchema(conf, SCHEMA); conf.setInputFormat(AvroUtf8InputFormat.class); AvroJob.setMapperClass(conf, MaxTemperatureMapper.class); AvroJob.setReducerClass(conf, MaxTemperatureReducer.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { // String[] strs = new String[2]; //测试本地 // strs[0] = "hdfs://localhost:9000/ncdc/sample"; // strs[1] = "hdfs://localhost:9000/ncdc/output"; int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args); System.exit(exitCode); } }
3. NcdcRecordParser.java
package hadoop.util; import org.apache.hadoop.io.Text; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by zhuqiuhui on 2017/7/11. */ public class NcdcRecordParser {private static final int MISSING_TEMPERATURE = 9999; private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm"); private String stationId; private String observationDateString; private String year; private String airTemperatureString; private int airTemperature; private boolean airTemperatureMalformed; private String quality; public void parse(String record) { stationId = record.substring(4, 10) + "-" + record.substring(10, 15); observationDateString = record.substring(15, 27); year = record.substring(15, 19); airTemperatureMalformed = false; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); airTemperature = Integer.parseInt(airTemperatureString); } else if (record.charAt(87) == '-') { airTemperatureString = record.substring(87, 92); airTemperature = Integer.parseInt(airTemperatureString); } else { airTemperatureMalformed = true; } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public boolean isMalformedTemperature() { return airTemperatureMalformed; } public boolean isMissingTemperature() { return airTemperature == MISSING_TEMPERATURE; } public String getStationId() { return stationId; } public Date getObservationDate() { try { System.out.println(observationDateString); return DATE_FORMAT.parse(observationDateString); } catch (ParseException e) { throw new IllegalArgumentException(e); } } public String getYear() { return year; } public int getYearInt() { return Integer.parseInt(year); } public int getAirTemperature() { return airTemperature; } public String getAirTemperatureString() { return airTemperatureString; } public String getQuality() { return quality; } }
4. 数据文件 sample
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
二、运行过程
1、执行:mvn clean -U install 把工程打成 jar 包,依赖于 maven 插件 maven-assembly-plugin 参考 pom.xml 其中指定了 jar 运行的主类。2、执行:hadoop jar hadoopProject-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://localhost:9000/ncdc/sample hdfs://localhost:9000/ncdc/output
其中:
hadoopProject-1.0-SNAPSHOT-jar-with-dependencies.jar:生成的jar包
hdfs://localhost:9000/ncdc/testTempData:hadoop上测试文件
hdfs://localhost:9000/ncdc/output:输出位置
三、遇到的问题
(1)Exception in thread "main" java.io.IOException: Mkdirs failed to create /var/folders/................./META-INF/license方法:直接删除掉 jar 包中的文件,执行命令:zip -d hadoopProject-1.0-SNAPSHOT-jar-with-dependencies.jar META-INF/LICENSE 即可
(2)Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
其中显示内容如下:
17/07/12 15:13:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/12 15:13:24 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/07/12 15:13:25 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/07/12 15:13:26 INFO mapred.FileInputFormat: Total input files to process : 1
17/07/12 15:13:26 INFO mapreduce.JobSubmitter: number of splits:2
17/07/12 15:13:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1499777740530_0007
17/07/12 15:13:26 INFO impl.YarnClientImpl: Submitted application application_1499777740530_0007
17/07/12 15:13:26 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1499777740530_0007/ 17/07/12 15:13:26 INFO mapreduce.Job: Running job: job_1499777740530_0007
17/07/12 15:13:32 INFO mapreduce.Job: Job job_1499777740530_0007 running in uber mode : false
17/07/12 15:13:32 INFO mapreduce.Job: map 0% reduce 0%
17/07/12 15:13:36 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000000_0, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:36 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000001_0, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:40 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000000_1, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:40 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000001_1, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:44 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000000_2, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:45 INFO mapreduce.Job: Task Id : attempt_1499777740530_0007_m_000001_2, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
17/07/12 15:13:49 INFO mapreduce.Job: map 50% reduce 100%
17/07/12 15:13:50 INFO mapreduce.Job: map 100% reduce 100%
17/07/12 15:13:50 INFO mapreduce.Job: Job job_1499777740530_0007 failed with state FAILED due to: Task failed task_1499777740530_0007_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0
17/07/12 15:13:50 INFO mapreduce.Job: Counters: 17
Job Counters
Failed map tasks=7
Killed map tasks=1
Killed reduce tasks=1
Launched map tasks=8
Other local map tasks=6
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=19264
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=19264
Total time spent by all reduce tasks (ms)=0
Total vcore-milliseconds taken by all map tasks=19264
Total vcore-milliseconds taken by all reduce tasks=0
Total megabyte-milliseconds taken by all map tasks=19726336
Total megabyte-milliseconds taken by all reduce tasks=0
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:873)
at hadoop.main.AvroGenericMaxTemperature.run(AvroGenericMaxTemperature.java:101)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at hadoop.main.AvroGenericMaxTemperature.main(AvroGenericMaxTemperature.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
上述内容即是说,找不到 org.apache.avro.generic.GenericData.createDatumWriter 后来找了下 Avro 的版本,发现使用默认的是 1.7.2 的,没有该类,后来查了资料,1.7.5 版本才出现,于是重新引入了 1.8.2 版本的,一切正常!参考 1.8.2 版本的依赖:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
四、输出结果
使用 avro-tools 工具来查看(brew install avro-tools 即可安装):avro-tools tojson part-00000.avrolog4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"year":1949,"temperature":111,"stationId":"012650-99999"}
{"year":1950,"temperature":22,"stationId":"011990-99999"}
相关文章推荐
- hadoop2.x入门:编写mapreduce对气象数据集求每日最高气温和最低气温
- hadoop hdfs搭建 mapreduce环境搭建 wordcount程序简单注释
- 如何在Hadoop上编写MapReduce程序
- Hadoop示例程序之单词统计MapReduce
- Hadoop MapReduce程序中解决第三方jar包问题--终极解决方案
- Eclipse下运行hadoop自带的mapreduce程序--wordcount
- 如何在Hadoop上编写MapReduce程序
- Hadoop初级入门 MapReduce程序执行集群执行
- hadoop学习笔记-5-最高气温示例MaxTemperature
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 如何使用Python为Hadoop编写一个简单的MapReduce程序(这个人T字还有好几篇精华的可以看)
- hadoop 0.23 YARN分布式程序的编写 (Hadoop MapReduce Next Generation - Writing YARN Applications)
- 如何在Hadoop上编写MapReduce程序
- 用PHP编写Hadoop的MapReduce程序
- Hadoop示例程序之单词统计MapReduce
- Hadoop学习笔记2--第一个Mapreduce程序
- 用PHP编写Hadoop的MapReduce程序
- hadoop MapReduce程序 不包含Reduce的设置
- 黑马程序员--基于Hadoop新版API的MapReduce程序
- 如何使用Python为Hadoop编写一个简单的MapReduce程序