大数据 IMF传奇行动 hadoop 中 开发mapreduce 天气预报的例子
2016-02-11 09:11
507 查看
大数据 IMF传奇行动 hadoop 中 开发mapreduce 天气预报的例子
1、拿到数据文件
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
2、
元数据描述:
第15-19个字符表示year,例如1950年、1949年等;
第45-50个字符表示的是温度,例如-00111、+00001
第50位只能是0、1、4、5、9等几个数字;
3、文件上传到 hadoop hdfs
[root@master testdata]#hadoop dfs -put data.txt /temperature
[root@master testdata]#hadoop dfs -ls /temperature
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
16/02/10 18:49:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 1 root supergroup 454 2016-02-10 18:49 /temperature/data.txt
4、开发 天气预报 ,源代码附后
4.1 重写 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
String data = value.toString(); //读入文件的每一行 ,key为偏移量,value为一行的值
String year =data.substring(15,19); //取年份
int temperature = 0;
if ('+' == data.charAt(45)){
temperature =Integer.parseInt(data.substring(46,50)); //取温度
} else {
temperature =Integer.parseInt(data.substring(45,50));
}
String validDataFlag =data.substring(50,51);
if (temperature !=MiSSING && validDataFlag.matches("[01459]")){ //正则表达式,清洗数据
context.write(new Text(year), new IntWritable(temperature)); //返回集合 ,key 年 value 同年的 温度集合
}
4.2 重写
protected void reduce(Text year, Iterable<IntWritable> data,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {}
for(IntWritable item:data){
coldestTemperature =Math.min(coldestTemperature, item.get());
} //遍历 温度集合 ,比较温度大小
context.write(year,new IntWritable(coldestTemperature)); //返回key:年,vlaue:最低温度
4.3 job提交 八股文
Job job = new Job(conf);
job.setJobName("TemperatureComputation");
job.setJarByClass(TemperatureComputation.class);
job.setMapperClass(TeperatureMapper.class);
// job.setCombinerClass(MyReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("job!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
5、eclipse运行又有问题
args数组的长度:2
数组内容:
hdfs://192.168.2.100:9000/temperature/data.txt
hdfs://192.168.2.100:9000/wordcount/output10
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
后面又卡住了 !!!
6.只好打成 jar包,人工提交集群运行
[root@master testdata]#hadoop jar Temperature.jar com.dtspak.hadoop.TemperatureComputation hdfs://192.168.2.100:9000/temperature/data.txt hdfs://192.168.2.100:9000/temperature/output
args鏁扮粍鐨勯暱搴︼細2
鏁扮粍鍐呭锛
hdfs://192.168.2.100:9000/temperature/data.txt
hdfs://192.168.2.100:9000/temperature/output
16/02/10 19:40:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
job!
16/02/10 19:40:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/10 19:40:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/10 19:40:28 INFO input.FileInputFormat: Total input paths to process : 1
16/02/10 19:40:29 INFO mapreduce.JobSubmitter: number of splits:1
16/02/10 19:40:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1648643189_0001
16/02/10 19:40:32 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/02/10 19:40:32 INFO mapreduce.Job: Running job: job_local1648643189_0001
16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/02/10 19:40:32 INFO mapred.LocalJobRunner: Waiting for map tasks
16/02/10 19:40:32 INFO mapred.LocalJobRunner: Starting task: attempt_local1648643189_0001_m_000000_0
16/02/10 19:40:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
16/02/10 19:40:33 INFO mapred.MapTask: Processing split: hdfs://192.168.2.100:9000/temperature/data.txt:0+454
16/02/10 19:40:33 INFO mapreduce.Job: Job job_local1648643189_0001 running in uber mode : false
16/02/10 19:40:33 INFO mapreduce.Job: map 0% reduce 0%
16/02/10 19:40:38 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
运行结果如下:
[root@master testdata]#hadoop dfs -cat /temperature/output/part-r-00000
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
16/02/10 19:42:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1949 111
1950 -11
[root@master testdata]#
7.源代码 :
package com.dtspak.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.dtspark.hadoop.hellomapreduce.test;
import com.dtspark.hadoop.hellomapreduce.test.MyMapper;
import com.dtspark.hadoop.hellomapreduce.test.MyReducer;
public class TemperatureComputation {
public static class TeperatureMapper extends Mapper <LongWritable,Text,Text,IntWritable>{
private static final int MiSSING =9999;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
String data = value.toString();
String year =data.substring(15,19);
int temperature = 0;
if ('+' == data.charAt(45)){
temperature =Integer.parseInt(data.substring(46,50));
} else {
temperature =Integer.parseInt(data.substring(45,50));
}
String validDataFlag =data.substring(50,51);
if (temperature !=MiSSING && validDataFlag.matches("[01459]")){
context.write(new Text(year), new IntWritable(temperature));
}
}
}
public static class TemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text year, Iterable<IntWritable> data,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int coldestTemperature =Integer.MAX_VALUE;
for(IntWritable item:data){
coldestTemperature =Math.min(coldestTemperature, item.get());
}
context.write(year,new IntWritable(coldestTemperature));
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: EventCount <in> <out>");
System.exit(2);
}
System.out.println("args数组的长度:"+args.length);
int i;
System.out.println("数组内容:");
//for(i=0;i<args.length;i++)
System.out.println(args[0]+" ");
System.out.println(args[1]+" ");
System.out.println();
Job job = new Job(conf);
job.setJobName("TemperatureComputation");
job.setJarByClass(TemperatureComputation.class);
job.setMapperClass(TeperatureMapper.class);
// job.setCombinerClass(MyReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("job!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
System.out.println("over!");
}
}
8.运行截图
![](http://img.blog.csdn.net/20160211091032150?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
![](http://img.blog.csdn.net/20160211091041275?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
1、拿到数据文件
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
2、
元数据描述:
第15-19个字符表示year,例如1950年、1949年等;
第45-50个字符表示的是温度,例如-00111、+00001
第50位只能是0、1、4、5、9等几个数字;
3、文件上传到 hadoop hdfs
[root@master testdata]#hadoop dfs -put data.txt /temperature
[root@master testdata]#hadoop dfs -ls /temperature
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
16/02/10 18:49:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 1 root supergroup 454 2016-02-10 18:49 /temperature/data.txt
4、开发 天气预报 ,源代码附后
4.1 重写 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
String data = value.toString(); //读入文件的每一行 ,key为偏移量,value为一行的值
String year =data.substring(15,19); //取年份
int temperature = 0;
if ('+' == data.charAt(45)){
temperature =Integer.parseInt(data.substring(46,50)); //取温度
} else {
temperature =Integer.parseInt(data.substring(45,50));
}
String validDataFlag =data.substring(50,51);
if (temperature !=MiSSING && validDataFlag.matches("[01459]")){ //正则表达式,清洗数据
context.write(new Text(year), new IntWritable(temperature)); //返回集合 ,key 年 value 同年的 温度集合
}
4.2 重写
protected void reduce(Text year, Iterable<IntWritable> data,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {}
for(IntWritable item:data){
coldestTemperature =Math.min(coldestTemperature, item.get());
} //遍历 温度集合 ,比较温度大小
context.write(year,new IntWritable(coldestTemperature)); //返回key:年,vlaue:最低温度
4.3 job提交 八股文
Job job = new Job(conf);
job.setJobName("TemperatureComputation");
job.setJarByClass(TemperatureComputation.class);
job.setMapperClass(TeperatureMapper.class);
// job.setCombinerClass(MyReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("job!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
5、eclipse运行又有问题
args数组的长度:2
数组内容:
hdfs://192.168.2.100:9000/temperature/data.txt
hdfs://192.168.2.100:9000/wordcount/output10
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
后面又卡住了 !!!
6.只好打成 jar包,人工提交集群运行
[root@master testdata]#hadoop jar Temperature.jar com.dtspak.hadoop.TemperatureComputation hdfs://192.168.2.100:9000/temperature/data.txt hdfs://192.168.2.100:9000/temperature/output
args鏁扮粍鐨勯暱搴︼細2
鏁扮粍鍐呭锛
hdfs://192.168.2.100:9000/temperature/data.txt
hdfs://192.168.2.100:9000/temperature/output
16/02/10 19:40:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
job!
16/02/10 19:40:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/10 19:40:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/10 19:40:28 INFO input.FileInputFormat: Total input paths to process : 1
16/02/10 19:40:29 INFO mapreduce.JobSubmitter: number of splits:1
16/02/10 19:40:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1648643189_0001
16/02/10 19:40:32 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/02/10 19:40:32 INFO mapreduce.Job: Running job: job_local1648643189_0001
16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/02/10 19:40:32 INFO mapred.LocalJobRunner: Waiting for map tasks
16/02/10 19:40:32 INFO mapred.LocalJobRunner: Starting task: attempt_local1648643189_0001_m_000000_0
16/02/10 19:40:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
16/02/10 19:40:33 INFO mapred.MapTask: Processing split: hdfs://192.168.2.100:9000/temperature/data.txt:0+454
16/02/10 19:40:33 INFO mapreduce.Job: Job job_local1648643189_0001 running in uber mode : false
16/02/10 19:40:33 INFO mapreduce.Job: map 0% reduce 0%
16/02/10 19:40:38 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
运行结果如下:
[root@master testdata]#hadoop dfs -cat /temperature/output/part-r-00000
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
16/02/10 19:42:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1949 111
1950 -11
[root@master testdata]#
7.源代码 :
package com.dtspak.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.dtspark.hadoop.hellomapreduce.test;
import com.dtspark.hadoop.hellomapreduce.test.MyMapper;
import com.dtspark.hadoop.hellomapreduce.test.MyReducer;
public class TemperatureComputation {
public static class TeperatureMapper extends Mapper <LongWritable,Text,Text,IntWritable>{
private static final int MiSSING =9999;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
String data = value.toString();
String year =data.substring(15,19);
int temperature = 0;
if ('+' == data.charAt(45)){
temperature =Integer.parseInt(data.substring(46,50));
} else {
temperature =Integer.parseInt(data.substring(45,50));
}
String validDataFlag =data.substring(50,51);
if (temperature !=MiSSING && validDataFlag.matches("[01459]")){
context.write(new Text(year), new IntWritable(temperature));
}
}
}
public static class TemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text year, Iterable<IntWritable> data,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int coldestTemperature =Integer.MAX_VALUE;
for(IntWritable item:data){
coldestTemperature =Math.min(coldestTemperature, item.get());
}
context.write(year,new IntWritable(coldestTemperature));
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: EventCount <in> <out>");
System.exit(2);
}
System.out.println("args数组的长度:"+args.length);
int i;
System.out.println("数组内容:");
//for(i=0;i<args.length;i++)
System.out.println(args[0]+" ");
System.out.println(args[1]+" ");
System.out.println();
Job job = new Job(conf);
job.setJobName("TemperatureComputation");
job.setJarByClass(TemperatureComputation.class);
job.setMapperClass(TeperatureMapper.class);
// job.setCombinerClass(MyReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("job!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
System.out.println("over!");
}
}
8.运行截图
相关文章推荐
- 51Nod 1305 Pairwise Sum and Divide(规律、数学)
- Linux进程 -- wait/waitpid函数
- 217. Contains Duplicate
- 1106. Lowest Price in Supply Chain (25)
- AIM Tech Round Div 1
- C++ 工程实践(4):二进制兼容性http://blog.csdn.net/Solstice/article/details/6233478
- Light OJ 1086 Jogging Trails (Floyd+状压DP)
- AIM Tech Round (Div. 2)题解
- 1014. Waiting in Line (30)
- 70. Climbing Stairs LeetCode
- Light OJ 1071 Baker Vai (DP)
- 217. Contains Duplicate LeetCode
- Codeforces AIM TECT Round#1 B DP
- 大数据IMF-L38-MapReduce内幕解密听课笔记及总结
- 大数据IMF-L38-MapReduce内幕解密听课笔记及总结
- 【树状数组】2016 BUPT Winter Training #2 D
- 我与小娜(08):人工智能的伟大胜利
- UVA 10617 Again Palindrome
- NSGAIII中的Niche preservation操作
- NSGAIII中的Associate