您的位置:首页 > 运维架构

hadoop 学习笔记 二 ----MapReduce

2017-09-27 10:01 351 查看
1.MapReduce是一种可用于数据处理的编程模型,MapReduce程序本质上是并行运行的。

2.MapReduce 任务过程分为两个阶段:map阶段和reduce阶段。每个阶段以键/值对作为输入和输出。程序员需要定义两个函数:map函数和reduce函数。

3.MapReduce数据处理流程:





4.  MapReduce旧版   java编程实现:

4.1 map函数

package com.hadoop.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
*
* Mapper<LongWritable, Text, Text, IntWritable> Mapper接口参数分别指:输入键,输入值,输出键,输出值
*
*/
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {

String line = value.toString();

String year = line.substring(15, 19);

int airtemperature;

if (line.charAt(87) == '+') {
airtemperature = Integer.parseInt(line.substring(88, 92));
} else {
airtemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airtemperature != 9999 && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airtemperature));
}

}

}


4.2 reduce函数:

package com.hadoop.reducer;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
*
* Reducer<Text, IntWritable, Text, IntWritable> 接口参数:输入键,输入值,输出键,输出值
*
* 其中:reduce函数的输入类型必须与map函数输出类型一致
*
*/
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {

int maxvalue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxvalue = Math.max(maxvalue, values.next().get());
}
output.collect(key, new IntWritable(maxvalue));
}

}


4.3  执行函数

package com.hadoop.test;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

import com.hadoop.mapper.MaxTemperatureMapper;
import com.hadoop.reducer.MaxTemperatureReducer;

public class MaxTemperature {

public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("系统出错");
System.exit(-1);
}
//指定作业执行范围,控制整个作业运行
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("最高温度");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}

4.4 执行过程:--->生成jar包---->在hadoop环境下执行以下命令

$export HADOOP_CLASSPATH=build/classes//环境变量[b]HADOOP_CLASSPATH用于添加应用程序类的路径[/b]

$hadoop MaxTemperature input/sample.txt output //[b]MaxTemperature是jvm运行的类,后面两个是main方法传的输入输出路径参数[/b]

5 MapReduce新版 java实现

5.1 新版mapper

package com.hadoop.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
*
* Mapper<LongWritable, Text, Text, IntWritable> Mapper接口参数分别指:输入键,输入值,输出键,输出值
*
*/
public class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// super.map(key, value, context);
String line = value.toString();

String year = line.substring(15, 19);

int airtemperature;

if (line.charAt(87) == '+') {
airtemperature = Integer.parseInt(line.substring(88, 92));
} else {
airtemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airtemperature != 9999 && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airtemperature));
}

}

}


5.2 新版reduce

package com.hadoop.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
*
* Reducer<Text, IntWritable, Text, IntWritable> 参数:输入键,输入值,输出键,输出值
*
* 其中:reduce函数的输入类型必须与map函数输出类型一致
*
*/
public class NewMaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context content)
throws IOException, InterruptedException {
int maxvalue = Integer.MIN_VALUE;
for (IntWritable val : values) {
maxvalue = Math.max(maxvalue, val.get());
}
content.write(key, new IntWritable(maxvalue));
}

}


5.3 main方法

package com.hadoop.test;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.hadoop.mapper.NewMaxTemperatureMapper;
import com.hadoop.reducer.NewMaxTemperatureReducer;

public class NewMaxTemperature {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("系统出错");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
// 定义输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 定义输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


6.数据流







7.combiner 合并函数:

合并函数的输出作为reduce函数的输入。合并函数是一个优化方案,不会改变输出结果。

7.1 java代码实现:合并函数是通过reducer接口来定义

package com.hadoop.test;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

import com.hadoop.mapper.MaxTemperatureMapper;
import com.hadoop.reducer.MaxTemperatureReducer;

public class MaxTemperature {

public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("系统出错");
System.exit(-1);
}
// 指定作业执行范围,控制整个作业运行
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("最高温度");
// 定义输入数据路径
FileInputFormat.addInputPath(conf, new Path(args[0]));
// 定义输出数据路径
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);//合并函数,参数是实现reducer接口类
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: