您的位置:首页 > 大数据

大数据 IMF传奇行动 hadoop 中 开发mapreduce 天气预报的例子

2016-02-11 09:11 507 查看
大数据 IMF传奇行动 hadoop 中 开发mapreduce 天气预报的例子

1、拿到数据文件

0067011990999991950051507004888888889999999N9+00001+9999999999999999999999

0067011990999991950051512004888888889999999N9+00221+9999999999999999999999

0067011990999991950051518004888888889999999N9-00111+9999999999999999999999

0067011990999991949032412004888888889999999N9+01111+9999999999999999999999

0067011990999991950032418004888888880500001N9+00001+9999999999999999999999

0067011990999991950051507004888888880500001N9+00781+9999999999999999999999

2、

元数据描述:

第15-19个字符表示year,例如1950年、1949年等;

第45-50个字符表示的是温度,例如-00111、+00001

第50位只能是0、1、4、5、9等几个数字;

3、文件上传到 hadoop hdfs

[root@master testdata]#hadoop dfs -put data.txt /temperature



[root@master testdata]#hadoop dfs -ls /temperature

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.

16/02/10 18:49:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Found 1 items

-rw-r--r-- 1 root supergroup 454 2016-02-10 18:49 /temperature/data.txt



4、开发 天气预报 ,源代码附后

4.1 重写 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)



String data = value.toString(); //读入文件的每一行 ,key为偏移量,value为一行的值

String year =data.substring(15,19); //取年份

int temperature = 0;

if ('+' == data.charAt(45)){

temperature =Integer.parseInt(data.substring(46,50)); //取温度

} else {

temperature =Integer.parseInt(data.substring(45,50));

}

String validDataFlag =data.substring(50,51);

if (temperature !=MiSSING && validDataFlag.matches("[01459]")){ //正则表达式,清洗数据

context.write(new Text(year), new IntWritable(temperature)); //返回集合 ,key 年 value 同年的 温度集合

}



4.2 重写

protected void reduce(Text year, Iterable<IntWritable> data,

Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {}



for(IntWritable item:data){

coldestTemperature =Math.min(coldestTemperature, item.get());

} //遍历 温度集合 ,比较温度大小

context.write(year,new IntWritable(coldestTemperature)); //返回key:年,vlaue:最低温度


4.3 job提交 八股文

Job job = new Job(conf);

job.setJobName("TemperatureComputation");

job.setJarByClass(TemperatureComputation.class);

job.setMapperClass(TeperatureMapper.class);

// job.setCombinerClass(MyReducer.class);

job.setReducerClass(TemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.out.println("job!");

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)? 0 : 1);



5、eclipse运行又有问题

args数组的长度:2

数组内容:

hdfs://192.168.2.100:9000/temperature/data.txt

hdfs://192.168.2.100:9000/wordcount/output10

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.



后面又卡住了 !!!

6.只好打成 jar包,人工提交集群运行

[root@master testdata]#hadoop jar Temperature.jar com.dtspak.hadoop.TemperatureComputation hdfs://192.168.2.100:9000/temperature/data.txt hdfs://192.168.2.100:9000/temperature/output

args鏁扮粍鐨勯暱搴︼細2

鏁扮粍鍐呭锛

hdfs://192.168.2.100:9000/temperature/data.txt

hdfs://192.168.2.100:9000/temperature/output

16/02/10 19:40:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

job!

16/02/10 19:40:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id

16/02/10 19:40:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

16/02/10 19:40:28 INFO input.FileInputFormat: Total input paths to process : 1

16/02/10 19:40:29 INFO mapreduce.JobSubmitter: number of splits:1

16/02/10 19:40:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1648643189_0001

16/02/10 19:40:32 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/02/10 19:40:32 INFO mapreduce.Job: Running job: job_local1648643189_0001

16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter set in config null

16/02/10 19:40:32 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

16/02/10 19:40:32 INFO mapred.LocalJobRunner: Waiting for map tasks

16/02/10 19:40:32 INFO mapred.LocalJobRunner: Starting task: attempt_local1648643189_0001_m_000000_0

16/02/10 19:40:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]

16/02/10 19:40:33 INFO mapred.MapTask: Processing split: hdfs://192.168.2.100:9000/temperature/data.txt:0+454

16/02/10 19:40:33 INFO mapreduce.Job: Job job_local1648643189_0001 running in uber mode : false

16/02/10 19:40:33 INFO mapreduce.Job: map 0% reduce 0%

16/02/10 19:40:38 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)

运行结果如下:

[root@master testdata]#hadoop dfs -cat /temperature/output/part-r-00000

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.

16/02/10 19:42:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

1949 111

1950 -11

[root@master testdata]#



7.源代码 :

package com.dtspak.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import com.dtspark.hadoop.hellomapreduce.test;

import com.dtspark.hadoop.hellomapreduce.test.MyMapper;

import com.dtspark.hadoop.hellomapreduce.test.MyReducer;

public class TemperatureComputation {

public static class TeperatureMapper extends Mapper <LongWritable,Text,Text,IntWritable>{

private static final int MiSSING =9999;

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

//super.map(key, value, context);

String data = value.toString();

String year =data.substring(15,19);

int temperature = 0;

if ('+' == data.charAt(45)){

temperature =Integer.parseInt(data.substring(46,50));

} else {

temperature =Integer.parseInt(data.substring(45,50));

}

String validDataFlag =data.substring(50,51);

if (temperature !=MiSSING && validDataFlag.matches("[01459]")){

context.write(new Text(year), new IntWritable(temperature));

}

}

}

public static class TemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

@Override

protected void reduce(Text year, Iterable<IntWritable> data,

Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

int coldestTemperature =Integer.MAX_VALUE;

for(IntWritable item:data){

coldestTemperature =Math.min(coldestTemperature, item.get());

}

context.write(year,new IntWritable(coldestTemperature));

}

}

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length < 2) {

System.err.println("Usage: EventCount <in> <out>");

System.exit(2);

}

System.out.println("args数组的长度:"+args.length);

int i;

System.out.println("数组内容:");

//for(i=0;i<args.length;i++)

System.out.println(args[0]+" ");

System.out.println(args[1]+" ");

System.out.println();

Job job = new Job(conf);

job.setJobName("TemperatureComputation");

job.setJarByClass(TemperatureComputation.class);

job.setMapperClass(TeperatureMapper.class);

// job.setCombinerClass(MyReducer.class);

job.setReducerClass(TemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.out.println("job!");

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)? 0 : 1);

System.out.println("over!");

}

}



8.运行截图






内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: