MapReduce program example code
2013-08-21 15:57
447 查看
MapReduce program example code
MapReduce是google 的一项重要技术,它是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。MapReduce就是一种简化并行计算的编程模型,它让那些没有多少并行计算经验的开发人员也可以开发并行应用。MapReduce程序是设计用来并行计算大规模海量数据的,这需要把工作流分划到大量的机器上去,如果组件之间可以任意共享数据,那这个模型就无法扩展到大规模集群上去了,用来保持节点间数据的同步而产生的通信开销会使得大规模集群上变得不可靠和效率低下。
MapReduce是一种模式,一种云计算的核心计算模式,一种分布式运算技术,也是简化的分布式编程模式。MapReduce主要解决问题的程序并行模型,也是开发人员拆解问题的方法。MapReduce程序用来处理并行计算大规模海量数据的,可以用在非常广泛的应用程序中,包括发布gre,分布排序,web连接图反转,每台机器的词矢量,web访问日志分析,反向索引构建,文档聚类,基于统计的机器翻译等。值得注意的是,MapReduce实现以后,它被用来重新生成google的整个索引,并取代老的adhoc程序去更新索引。MapReduce会生成大量的临时文件,为了提高效率,它会利用分布式文件系统来管理和访问这些文件。
MapReduce的名字源于这个模型的两项核心操作:map和reduce。
简单来说,map是把一组数据一对一地映射为另外一组数据,其映射的规则是由一个函数来指定的,如对[1,2,3,4]进行乘2的映射就变成了[2,4,6,8].reduce是对一组数据进行归约,这个归约的规则是由一个函数指定,如对[1,2,3,4]进行求和的归约得到的结果是10,而对它进行求积的归约结果是24。概括来说,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
MapReduce集群由普通pc构成,为无共享式架构。在处理之前,将数据集分布至各个节点。处理时,每个节点就近读取本地存储的数据处理(map),将处理后的数据进行合并(combine),排序(shuffle and sort)后再分发(至reduce节点),从而避免了大量数据的传输,提高了处理效率。无共享式架构的另一个好处是配合复制(replication)策略,集群可以具有良好的容错性,一部分节点当机对集群的正常工作不会造成影响。
Mapper针对每一个输入元素都要生成一个输出元素,reducer针对每一个输入列表都要生成一个输出元素。
Mapreduce运行在大规模集群之上,把完成一个并行计算,还需要任务调度,本地计算,洗牌过程等一系列环节共同支撑的计算的过程,把上述过程称为mapreduce的集群过程。
Mapreduce任务由一个jobtracker和多个tasktracker两类节点控制完成。
(1). Jobtracker节点
主要负责和管理tasktracker,它通常是运行在master节点上的。Jobtracker将mappers和reducers分配给空闲的tasktracker后,由tasktracker负责这些任务的并行执行。
(2). Tasktracker节点
必须运行在datanode节点上,所以说datanode既是数据的存储节点,也是计算节点。
(3). Jobtracker和tasktracker之间的关系
Jobtracker负责监控任务的运行状况,如果某个tasktracker发生故障,jobtracker就会将其负责的任务分配给其他空闲的tasktracker重新执行。
MapReduce处理大数据集的过程
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public
class WordCount {
public
static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private
final static IntWritable one =
new IntWritable(1);
private Text
word = new Text();
public
void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public
static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable
result = new IntWritable();
public
void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public
static void main(String[] args)
throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2){
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf,
"word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
BooleanWritable:标准布尔型数值
ByteWritable:单字节数值
DoubleWritable:双字节数
FloatWritable:浮点数
IntWritable:整型数
LongWritable:长整型数
Text:使用UTF8格式存储的文本
NullWritable:当<key,value>中的key或value为空时使用
Main函数调用jobconf类来对mapreduce job进行初始化,这里命名job name为word count。接着读取传入的两个参数。
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
设置job处理的map(拆分),combiner(中间结果合并)以及reduce(合并)的相关处理类。这里用reduce类来进行mao产生的中间结果合并,避免给网络数据传输产生压力。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
设置job输出结果<key,value>中的key和value数据类型,结果是<单词,个数>,所以key设置为text类型,value设置为intwritable类型。
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示。
MapReduce是google 的一项重要技术,它是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。MapReduce就是一种简化并行计算的编程模型,它让那些没有多少并行计算经验的开发人员也可以开发并行应用。MapReduce程序是设计用来并行计算大规模海量数据的,这需要把工作流分划到大量的机器上去,如果组件之间可以任意共享数据,那这个模型就无法扩展到大规模集群上去了,用来保持节点间数据的同步而产生的通信开销会使得大规模集群上变得不可靠和效率低下。
MapReduce是一种模式,一种云计算的核心计算模式,一种分布式运算技术,也是简化的分布式编程模式。MapReduce主要解决问题的程序并行模型,也是开发人员拆解问题的方法。MapReduce程序用来处理并行计算大规模海量数据的,可以用在非常广泛的应用程序中,包括发布gre,分布排序,web连接图反转,每台机器的词矢量,web访问日志分析,反向索引构建,文档聚类,基于统计的机器翻译等。值得注意的是,MapReduce实现以后,它被用来重新生成google的整个索引,并取代老的adhoc程序去更新索引。MapReduce会生成大量的临时文件,为了提高效率,它会利用分布式文件系统来管理和访问这些文件。
MapReduce的名字源于这个模型的两项核心操作:map和reduce。
简单来说,map是把一组数据一对一地映射为另外一组数据,其映射的规则是由一个函数来指定的,如对[1,2,3,4]进行乘2的映射就变成了[2,4,6,8].reduce是对一组数据进行归约,这个归约的规则是由一个函数指定,如对[1,2,3,4]进行求和的归约得到的结果是10,而对它进行求积的归约结果是24。概括来说,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
MapReduce集群由普通pc构成,为无共享式架构。在处理之前,将数据集分布至各个节点。处理时,每个节点就近读取本地存储的数据处理(map),将处理后的数据进行合并(combine),排序(shuffle and sort)后再分发(至reduce节点),从而避免了大量数据的传输,提高了处理效率。无共享式架构的另一个好处是配合复制(replication)策略,集群可以具有良好的容错性,一部分节点当机对集群的正常工作不会造成影响。
Mapper针对每一个输入元素都要生成一个输出元素,reducer针对每一个输入列表都要生成一个输出元素。
Mapreduce运行在大规模集群之上,把完成一个并行计算,还需要任务调度,本地计算,洗牌过程等一系列环节共同支撑的计算的过程,把上述过程称为mapreduce的集群过程。
Mapreduce任务由一个jobtracker和多个tasktracker两类节点控制完成。
(1). Jobtracker节点
主要负责和管理tasktracker,它通常是运行在master节点上的。Jobtracker将mappers和reducers分配给空闲的tasktracker后,由tasktracker负责这些任务的并行执行。
(2). Tasktracker节点
必须运行在datanode节点上,所以说datanode既是数据的存储节点,也是计算节点。
(3). Jobtracker和tasktracker之间的关系
Jobtracker负责监控任务的运行状况,如果某个tasktracker发生故障,jobtracker就会将其负责的任务分配给其他空闲的tasktracker重新执行。
MapReduce处理大数据集的过程
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public
class WordCount {
public
static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private
final static IntWritable one =
new IntWritable(1);
private Text
word = new Text();
public
void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public
static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable
result = new IntWritable();
public
void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public
static void main(String[] args)
throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2){
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf,
"word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
BooleanWritable:标准布尔型数值
ByteWritable:单字节数值
DoubleWritable:双字节数
FloatWritable:浮点数
IntWritable:整型数
LongWritable:长整型数
Text:使用UTF8格式存储的文本
NullWritable:当<key,value>中的key或value为空时使用
Main函数调用jobconf类来对mapreduce job进行初始化,这里命名job name为word count。接着读取传入的两个参数。
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
设置job处理的map(拆分),combiner(中间结果合并)以及reduce(合并)的相关处理类。这里用reduce类来进行mao产生的中间结果合并,避免给网络数据传输产生压力。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
设置job输出结果<key,value>中的key和value数据类型,结果是<单词,个数>,所以key设置为text类型,value设置为intwritable类型。
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示。
相关文章推荐
- Hadoop example program code
- Java Program to print Prime numbers in Java - Example Tutorial and Code
- perl program example
- Mac系统运行MapReduce程序报错:exitCode 127
- SAP传输停止错误:Test call of transport control program (tp) ended with return code 0208!
- Design Patterns Example Code (in C++)
- C Java PHP Perl Python 的程序代码美化工具(Pretty Print Program/Source Code Beautifier)使用
- 用KEIL编写C语言代码,编译过后会出现形如:Program Size: data=13.0 xdata=0 code=417
- ibm urbancode example
- debug code example in Kernel
- SAP传输停止错误:Test call of transport control program (tp) ended with return code 0208!
- Linux线程通讯code example
- A real example of vioplot in R (sample data and code attached)
- QtCreator:during startup program exited with code 0xc0000139 QT设置环境变量
- Program&nbsp;exited&nbsp;with&nbsp;code&nbsp;****&nbsp;2
- Spring 4 MVC example with Maven - [Source Code Download]
- a example for calling java code in javascript use dwr
- Ruby by Example: Concepts and Code
- Amazon update order status api Complete example code
- A good blog about how to write an Hadoop MapReduce program in Python