hadoop 学习笔记 二 ----MapReduce
2017-09-27 10:01
351 查看
1.MapReduce是一种可用于数据处理的编程模型,MapReduce程序本质上是并行运行的。
2.MapReduce 任务过程分为两个阶段:map阶段和reduce阶段。每个阶段以键/值对作为输入和输出。程序员需要定义两个函数:map函数和reduce函数。
3.MapReduce数据处理流程:
4. MapReduce旧版 java编程实现:
4.1 map函数
4.2 reduce函数:
4.3 执行函数
4.4 执行过程:--->生成jar包---->在hadoop环境下执行以下命令
$export HADOOP_CLASSPATH=build/classes//环境变量[b]HADOOP_CLASSPATH用于添加应用程序类的路径[/b]
$hadoop MaxTemperature input/sample.txt output //[b]MaxTemperature是jvm运行的类,后面两个是main方法传的输入输出路径参数[/b]
5 MapReduce新版 java实现
5.1 新版mapper
5.2 新版reduce
5.3 main方法
6.数据流
7.combiner 合并函数:
合并函数的输出作为reduce函数的输入。合并函数是一个优化方案,不会改变输出结果。
7.1 java代码实现:合并函数是通过reducer接口来定义的
package com.hadoop.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import com.hadoop.mapper.MaxTemperatureMapper;
import com.hadoop.reducer.MaxTemperatureReducer;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("系统出错");
System.exit(-1);
}
// 指定作业执行范围,控制整个作业运行
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("最高温度");
// 定义输入数据路径
FileInputFormat.addInputPath(conf, new Path(args[0]));
// 定义输出数据路径
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);//合并函数,参数是实现reducer接口类
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
2.MapReduce 任务过程分为两个阶段:map阶段和reduce阶段。每个阶段以键/值对作为输入和输出。程序员需要定义两个函数:map函数和reduce函数。
3.MapReduce数据处理流程:
4. MapReduce旧版 java编程实现:
4.1 map函数
package com.hadoop.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * * Mapper<LongWritable, Text, Text, IntWritable> Mapper接口参数分别指:输入键,输入值,输出键,输出值 * */ public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airtemperature; if (line.charAt(87) == '+') { airtemperature = Integer.parseInt(line.substring(88, 92)); } else { airtemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airtemperature != 9999 && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airtemperature)); } } }
4.2 reduce函数:
package com.hadoop.reducer; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * * Reducer<Text, IntWritable, Text, IntWritable> 接口参数:输入键,输入值,输出键,输出值 * * 其中:reduce函数的输入类型必须与map函数输出类型一致 * */ public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxvalue = Integer.MIN_VALUE; while (values.hasNext()) { maxvalue = Math.max(maxvalue, values.next().get()); } output.collect(key, new IntWritable(maxvalue)); } }
4.3 执行函数
package com.hadoop.test; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import com.hadoop.mapper.MaxTemperatureMapper; import com.hadoop.reducer.MaxTemperatureReducer; public class MaxTemperature { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("系统出错"); System.exit(-1); } //指定作业执行范围,控制整个作业运行 JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("最高温度"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } }
4.4 执行过程:--->生成jar包---->在hadoop环境下执行以下命令
$export HADOOP_CLASSPATH=build/classes//环境变量[b]HADOOP_CLASSPATH用于添加应用程序类的路径[/b]
$hadoop MaxTemperature input/sample.txt output //[b]MaxTemperature是jvm运行的类,后面两个是main方法传的输入输出路径参数[/b]
5 MapReduce新版 java实现
5.1 新版mapper
package com.hadoop.mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * Mapper<LongWritable, Text, Text, IntWritable> Mapper接口参数分别指:输入键,输入值,输出键,输出值 * */ public class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String line = value.toString(); String year = line.substring(15, 19); int airtemperature; if (line.charAt(87) == '+') { airtemperature = Integer.parseInt(line.substring(88, 92)); } else { airtemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airtemperature != 9999 && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airtemperature)); } } }
5.2 新版reduce
package com.hadoop.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * Reducer<Text, IntWritable, Text, IntWritable> 参数:输入键,输入值,输出键,输出值 * * 其中:reduce函数的输入类型必须与map函数输出类型一致 * */ public class NewMaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context content) throws IOException, InterruptedException { int maxvalue = Integer.MIN_VALUE; for (IntWritable val : values) { maxvalue = Math.max(maxvalue, val.get()); } content.write(key, new IntWritable(maxvalue)); } }
5.3 main方法
package com.hadoop.test; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.hadoop.mapper.NewMaxTemperatureMapper; import com.hadoop.reducer.NewMaxTemperatureReducer; public class NewMaxTemperature { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2) { System.err.println("系统出错"); System.exit(-1); } Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); // 定义输入数据路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 定义输出数据路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReducer.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
6.数据流
7.combiner 合并函数:
合并函数的输出作为reduce函数的输入。合并函数是一个优化方案,不会改变输出结果。
7.1 java代码实现:合并函数是通过reducer接口来定义的
package com.hadoop.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import com.hadoop.mapper.MaxTemperatureMapper;
import com.hadoop.reducer.MaxTemperatureReducer;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("系统出错");
System.exit(-1);
}
// 指定作业执行范围,控制整个作业运行
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("最高温度");
// 定义输入数据路径
FileInputFormat.addInputPath(conf, new Path(args[0]));
// 定义输出数据路径
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);//合并函数,参数是实现reducer接口类
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
相关文章推荐
- Hadoop学习笔记 --- MapReduce实现WorldCount原理解析
- hadoop学习笔记之MapReduce特性
- Hadoop学习笔记2--第一个Mapreduce程序
- Hadoop学习笔记:MapReduce框架详解
- Hadoop学习笔记(二):MapReduce的特性-计数器、排序
- hadoop 学习笔记:mapreduce框架详解
- hadoop 学习笔记:mapreduce框架详解
- hadoop学习笔记-4-eclipse运行MapReduce
- hadoop 学习笔记:mapreduce框架详解
- <hadoop学习历程>--笔记心得6-MapReduce原理
- hadoop学习笔记(三)mapreduce程序wordcount
- Hadoop学习笔记(一):MapReduce的输入格式
- hadoop学习笔记之mapreduce 基于hbase日志数据的最频繁访问ip统计
- Hadoop学习笔记: MapReduce二次排序
- 【Hadoop学习笔记】——MapReduce
- Hadoop学习笔记(二)——MapReduce
- Hadoop 学习笔记一 ---MapReduce 的输入和输出
- Hadoop学习笔记:MapReduce原理简单学习
- Hadoop学习笔记:MapReduce框架详解
- Hadoop学习笔记:MapReduce框架详解