您的位置:首页 > 其它

分析统计单词数例子MapReduce执行过程 .

2014-08-08 12:36 429 查看
转自:http://www.linuxidc.com/Linux/2014-01/95386p2.htm

资源文件file.txt

hello Hadoop

hello word

this is my first hadoop program

分析:一个文档中每行的单词通过空格分割后获取,经过map阶段可以将所有的单词整理成如下形式:

key:hello value:1

key:hadoop value:1

key:hello value:1

key:word value:1

key:this value:1

key:is value:1

key:my value:1

key:first value:1

key:hadoop value:1

key:program value:1

经过hadoop整理后以如下形式输入到Reduce中:

key:hello value:{1,1}

key:hadoop value: {1,1}

key:word value:{1}

key:this value: {1}

key:is value:{1}

key:myvalue: {1}

key:first value:{1}

key:program value:{1}

所以Reduce接受的时候是以Iterable<IntWritable> values最为值。在Reduce中我们就可以将value中的值迭代相加就可以得出该单词出现的次数。

实现:

package com.bwzy.hadoop;

import java.io.File;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

public static class Map

extends

org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key,Text value,Context context){

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);//读取每一行数据,并将该行数据以空格分割(StringTokenizer默认是以空格分割字符串)

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

try {

context.write(word, one);//输出给Reduce

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException{

int sum = 0;

//合并每个单词出现的次数

for (IntWritable val : values) {

sum+=val.get();

}

context.write(key, new IntWritable(sum));

}

}

public static void main(String[] args) throws Exception {

int ret = ToolRunner.run(new WordCount(), args);

System.exit(ret);

}

@Override

public int run(String[] arg0) throws Exception {

Job job = new Job(getConf());

job.setJobName("wordcount");

job.setOutputKeyClass(Text.class);//key--设置输出格式

job.setOutputValueClass(IntWritable.class);//value--设置输出格式

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(arg0[0]));

FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

boolean success = job.waitForCompletion(true);

return success?0:1;

}

}

运行:

1:将程序打包

选中打包的类-->右击-->Export-->java-->JAR file--填入保存路径-->完成

2:将jar包拷贝到hadoop的目录下。(因为程序中用到来hadoop的jar包)

3:将资源文件(存储这单词的文件,假设在/home/user/Document/file1.txt)上传到定义的hdfs目录下

创建hdfs目录命令(在hadoop已经成功启动的前提下):hadoop fs -mkdir /自定义/自定义/input

上传本地资源文件到hdfs上:hadop fs -put -copyFromLocal /home/user/Document/file1.txt /自定义/自定义/input

4:运行MapReduce程序:

hadoop jar /home/user/hadoop-1.0.4/WordCount.jar com.bwzy.hadoop.WordCount /自定义/自定义/input /自定义/自定义/output

说明:hadoop运行后会自动创建/自定义/自定义/output目录,在该目录下会有两个文件,其中一个文件中存放来MapReduce运行的结果。如果重新运行该程序,需要将/自定义/自定义/output目录删除,否则系统认为该结果已经存在了。

5:运行的结果为

hello 2

hadoop 2

word 1

this 1

is 1

my 1

first 1

program 1

本篇文章来源于 Linux公社网站(www.linuxidc.com) 原文链接:http://www.linuxidc.com/Linux/2014-01/95386p2.htm


详解MapReduce算法

http://blog.sina.com.cn/s/blog_7eb42b5a0100uwtg.html

map()函数把输入数据进行切割(比如分为M块)之后,分布到不同的机器上执行(例如前面介绍的单词统计例子,可以把每一个文件分配到一台机器上执行)。Reduce()函数通过产生的键key(例如可以根据某种分区函数(比如hash(key)
mod R),R的值和分区函数都是由用户指定)将map()的结果集分成R块,然后分别在R台机器上执行。

图2.15是MapReduce算法示意图。当用户程序调用MapReduce函数时,就会引起如下的操作。





(1) MapReduce函数库首先把输入文件分成M块,每块大概16MB到64MB。接着在集群的机器上执行处理程序。

如图2.14所示,MapReduce算法运行过程中有一个主控程序,称为master。主控程序会产生很多作业程序,称为worker。并且把M个map任务和R个reduce任务分配给这些worker,让它们去完成。

(2) 被分配了map任务的worker读取并处理相关的输入(这里的输入是指已经被切割的输入小块splite)。它处理输入的数据,并且将分析出的键/值(key/value)对传递给用户定义的reduce()函数。map()函数产生的中间结果键/值(key/value)对暂时缓冲到内存。

(3) map()函数缓冲到内存的中间结果将被定时刷写到本地硬盘,这些数据通过分区函数分成R个区。这些中间结果在本地硬盘的位置信息将被发送回master,然后这个master负责把这些位置信息传送给reduce()函数的worker。

(4) 当master通知了reduce()函数的worker关于中间键/值(key/value)对的位置时,worker调用远程方法从map()函数的worker机器的本地硬盘上读取缓冲的中间数据。当reduce()函数的worker读取到了所有的中间数据,它就使用这些中间数据的键(key)进行排序,这样可以使得相同键(key)的值都在一起。如果中间结果集太大了,那么就需要使用外排序。

(5) reduce()函数的worker根据每一个中间结果的键(key)来遍历排序后的数据,并且把键(key)和相关的中间结果值(value)集合传递给reduce()函数。reduce()函数的worker最终把输出结果存放在master机器的一个输出文件中。

(6) 当所有的map任务和reduce任务都已经完成后,master激活用户程序。在这时,MapReduce返回用户程序的调用点。

(7) 当以上步骤成功结束以后,MapReduce的执行数据存放在总计R个输出文件中(每个输出文件都是由reduce任务产生的,这些文件名是用户指定的)。通常,用户不需要将这R个输出文件合并到一个文件,他们通常把这些文件作为输入传递给另一个MapReduce调用,或者用另一个分布式应用来处理这些文件,并且这些分布式应用把这些文件看成为输入文件由于分区(partition)成为的多个块文件。

分析MapReduce执行过程+统计单词数例子

转自:/article/7848635.html

MapReduce 运行的时候,会通过 Mapper 运行的任务读取 HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。整个流程如图




Mapper任务的执行过程

每个 Mapper 任务是一个 java 进程,它会读取 HDFS 中的文件,解析成很多的键值对,经过我们覆盖的 map 方法处理后, 转换为很多的键值对再输出。 整个 Mapper 任务的处理过程又可以分为以下几个阶段



把 Mapper 任务的运行过程分为六个阶段。

第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。每一个输入片由一个 Mapper 进程处理。这里的三个输入片,会有三个 Mapper 进程处理。
第二阶段是对输入片中的记录按照一定的规则解析成键值对。 有个默认规则是把每一行文本内容解析成键值对。 “键”是每一行的起始位置(单位是字节), “值”是本行的文本内容。
第三阶段是调用 Mapper 类中的 map 方法。 第二阶段中解析出来的每一个键值对, 调用一次 map 方法。如果有 1000 个键值对,就会调用 1000 次 map 方法。每一次调用 map 方法会输出零个或者多个键值对。
第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个。分区的数量就是
Reducer 任务运行的数量。默认只有一个 Reducer 任务。
第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的
linux 文件中。
第六阶段是对数据进行归约处理,也就是 reduce 处理。键相等的键值对会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的 linxu 文件中。


Reducer任务的执行过程

每个 Reducer 任务是一个 java 进程。Reducer 任务接收 Mapper 任务的输出,归约处理后写入到 HDFS 中,可以分为如图所示的几个阶段



第一阶段是 Reducer 任务会主动从 Mapper 任务复制其输出的键值对。 Mapper 任务可能会有很多,因此 Reducer 会复制多个 Mapper 的输出。
第二阶段是把复制到 Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
第三阶段是对排序后的键值对调用 reduce 方法。 键相等的键值对调用一次 reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到 HDFS 文件中。

在整个 MapReduce 程序的开发过程中,我们最大的工作量是覆盖 map 函数和覆盖reduce 函数。

键值对的编号理解

在对 Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,读者容易混淆,所以这里对键值对进行编号,方便大家理解键值对的变化情况。如图



对于 Mapper 任务输入的键值对,定义为 key1 和 value1。在 map 方法中处理后,输出的键值对,定义为 key2 和 value2。reduce 方法接收 key2 和 value2,处理后,输出 key3 和 value3。在下文讨论键值对时,可能把 key1 和 value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和 value3 简写为<k3,v3>。


举例:单词计数

统计指定文件中的所有单词的出现次数。

内容很简单,两行文本,每行的单词中间使用空格区分。word.txt

hello you
hello world
分析思路:最直观的想法是使用数据结构 Map。解析文件中出现的每个单词,用单词作为 key,出现次数作为 value。 这个思路没有问题,但是在大数据环境下就不行了。我们需要使用MapReduce来做。 根据Mapper任务和Reducer任务的运行阶段, 我们知道在Mapper任务的第二阶段是把文件的每一行转化成键值对,那么第三阶段的 map 方法就能取得每一行文本内容,我们可以在
map 方法统计本行文本中单词出现的次数,把每个单词的出现次数作为新的键值对输出。在 Reducer 任务的第二阶段会对 Mapper 任务输出的键值对按照键进行排序,键相等的键值对会调用一次 reduce 方法。在这里, “键”就是单词, “值”就是出现次数。因此可以在 reduce 方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把这个结果输出。
覆盖 map 方法

[java] view
plaincopyprint?





package mapreduce2;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

//key2 表示该行中的单词

final Text key2 = new Text();

//value2 表示单词在该行中的出现次数

final IntWritable value2 = new IntWritable(1);

//key 表示文本行的起始位置

//value 表示文本行

protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {

final String[] splited = value.toString().split(" ");

for (String word : splited) {

key2.set(word);

//把key2、value2写入到context中

context.write(key2, value2);

}

};

}

map 方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文本内容按照空格拆分,把每个单词作为新的键,数值 1作为新的值,写入到上下文 context 中。在这里,因为输出的是每个单词,所以出现次数是常量 1。如果一行文本中包括两个 hello,会输出两次<hello,1>。
覆盖 reduce 方法

[java] view
plaincopyprint?





package mapreduce2;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

//value3表示单词出现的总次数

final IntWritable value3 = new IntWritable(0);

/**

* key 表示单词

* values 表示map方法输出的1的集合

* context 上下文对象

*/

protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {

int sum = 0;

for (IntWritable count : values) {

sum += count.get();

}

//执行到这里,sum表示该单词出现的总次数

//key3表示单词,是最后输出的key

final Text key3 = key;

//value3表示单词出现的总次数,是最后输出的value

value3.set(sum);

context.write(key3, value3);

};

}

Reducer 类的四个泛型依次是<k2,v2,k3,v3>,要注意 reduce 方法的第二个参数是 java.lang.Iterable 类型,迭代的是 v2。也就是 k2 相同的 v2 都可以迭代出来。
驱动代码,如下

[java] view
plaincopyprint?





package mapreduce2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class WordCountApp {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

//输入路径

final String INPUT_PATH = "hdfs://hadoop:9000/word.txt";

//输出路径,必须是不存在的

final String OUTPUT_PATH = "hdfs://hadoop:9000/output4";

//创建一个job对象,封装运行时需要的所有信息

final Job job = new Job(new Configuration(), "WordCountApp");

//如果需要打成jar运行,需要下面这句

//job.setJarByClass(WordCountApp.class);

//告诉job执行作业时输入文件的路径

FileInputFormat.setInputPaths(job, INPUT_PATH);

//设置把输入文件处理成键值对的类

job.setInputFormatClass(TextInputFormat.class);

//设置自定义的Mapper类

job.setMapperClass(MyMapper.class);

//设置map方法输出的k2、v2的类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//设置对k2分区的类

job.setPartitionerClass(HashPartitioner.class);

//设置运行的Reducer任务的数量

job.setNumReduceTasks(1);

//设置自定义的Reducer类

job.setReducerClass(MyReducer.class);

//设置reduce方法输出的k3、v3的类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//告诉job执行作业时的输出路径

FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//指明输出的k3类型

job.setOutputKeyClass(Text.class);

//指明输出的v3类型

job.setOutputValueClass(IntWritable.class);

//让作业运行,直到运行结束,程序退出

job.waitForCompletion(true);

}

}

在以上代码中,我们创建了一个 job 对象,这个对象封装了我们的任务,可以提交到Hadoop 独立运行。最后一句 job.waitForCompletion(true),表示把 job 对象提交给 Hadoop 运行,直到作业运行结束后才可以。
直接运行main方法即可;
运行之前记得将word.txt 上传到hdfs:hadoop fs --put word.txt / ;和将进程打开start-all.sh 运行结束后可以可以查看:
hadoop fs -text output4/part-r-00000 ;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: