您的位置:首页 > 其它

mapreduce框架设计思想,wordcount程序原理与实现

2017-10-08 22:12 1081 查看
1、mapreduce框架设计思想

mapreduce结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、mapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

运行流程:以wordcount(单词统计)为例



分析:

假如要统计三个文件中每个单词出现的次数

将文件上传到hdfs后,以图为例假设每个文件被切分为了4个block存在了4台机器上,mapreduce application master假设在第四台机器上,它来进行map task和reduce task的调度,如图示,每台机器上运行一个map task程序,每个map task分别计算自己所在机器上的三个block中每个单词出现的次数,最后每个map task将读到的a-h开头的单词与次数提交到第一台机器的reduce task,h-p开头的提交到第二台机器的reduce task,p-z开头的提交到第三天机器的reduce task,最后输出到三个hdfs文件。而对于调度问题都是由mapreduce application master来解决。

计算机制可以做如下思考。

1、每个map task 按行读取block中的数据,每行按照空格分开,然后存入到一个hashmap中,key为单词,value为数量,每读到相同的单词,value+1,读到以前没读过的单词,则新增key,value。

2、等到自己这台机器上的三个block都读完后,再创建三个hashmap将a-h的单词放到第一个hashmap,h-p开头的放到第二个hashmap,p-z开头的放到第三个hashmap。最后提交到对应的三个reduce task。

注:这样说只是为了更简单易懂,实际上存在一些差异的,比如是不是a-h就一定交给第一个reduce task,所有a开头的都会交给第一个reduce task?答案是否定的,实际上提交给哪个是根据每个单词的hashcode以及reduce task的数量决定的(参考 HashPartitioner类)。比如application单词以及出现次数就会交给第一个reduce task,而angry就可能交给第二个reduce task了。

为了方便理解下面就写个wordcount程序。

2、wordcount程序实现

1、编写map方法

/**
* KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,但是在hadoop中有自己的
* 更精简的序列化接口(Seria会将类结构都序列化,而实际我们只需要序列化数据),所以不直接用Long,而用LongWritable
* VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,String,这里是单词
* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer
* @author 12706
*
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* map阶段的业务逻辑就写在自定义的map()方法中
* maptask会对每一行输入数据调用一次我们自定义的map()方法
*/
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//将maptask传给我们的文本内容先转换成String
String line = value.toString();
//根据空格将这一行切分成单词
String[] words = line.split(" ");

//将单词输出为<单词,1>
for (String word : words) {
//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,
//以便于相同单词会到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}


2、编写reduce方法

/**
* KEYIN VALUEIN对应mapper输出的KEYOUT KEYOUT类型对应
* KEYOUT,VALUEOUT:是自定义reduce逻辑处理结果的输出数据类型
* KEYOUT:是单词
* VALUEOUT:是总次数
* @author 12706
*
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

/*
* <angry,1><angry,1><angry,1><angry,1><angry,1><angry,1>
* <banana,1><banana,1><banana,1><banana,1><banana,1>
* ...
* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
* 入参key,是一组相同单词kv的key
* 每个单词统计调一次,换一组再调用一次
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

int count = 0;
//      Iterator<IntWritable> iterator = values.iterator();
//      while(iterator.hasNext()){
//          IntWritable next = iterator.next();
//      }
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));

}
}


解释:

map()方法中我们设置的时候单词做为key,1作为value写出,比如

/**
* 相当于一个yarn集群的客户端
* 需要在此封装mr程序的相关运行参数,指定jar包
* 最后提交给yarn
* @author 12706
*
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);

//指定本业务job要使用的mapper,reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//虽然指定了泛型,以防框架使用第三方的类型
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定最终输出的数据的kv类型(这里就是指reducer输出类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出结果目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
//      job.submit();
boolean b = job.waitForCompletion(true);
//这个方法是用来结束当前正在运行中的java虚拟机。如果status是非零参数,那么表示是非正常退出。
System.exit(b?0:1);
}
}


4、进行单词统计测试

将工程打成jar包上传到linux(hadoop集群启动起来)

[root@mini2 hadoop-2.6.4]# hadoop fs -mkdir /wordcount/input
注:/wordcount/output目录不能创建
[root@mini2 hadoop-2.6.4]# hadoop fs -put LICENSE.txt NOTICE.txt README.txt /wordcount/input
[root@mini2 ~]# hadoop jar wordcount.jar com.itheima.hadoop.mapreduce.WordcountDriver /wordcount/input /wordcount/output


到这里程序就跑完了,可以查看/wordcount/output目录结构

[root@mini2 ~]# hadoop fs -ls /wordcount/output
-rw-r--r--   2 root supergroup          0 2017-10-09 05:06 /wordcount/output/_SUCCESS
-rw-r--r--   2 root supergroup       8989 2017-10-09 05:06 /wordcount/output/part-r-00000


说明只跑了一个reduce task,因为默认就是这样,可以自己设置数量。

再查看单词统计结果

[root@mini2 ~]# hadoop fs -cat /wordcount/output/part-r-00000
...
within  8
without 6
work    5
work,   2
work.   1
works   1
worldwide,      2
writing 1
writing,        3
written 3
you     3
your    5


3、wordcount原理

wordcount运行过程分析图



1、程序启动后,根据客户端提交的job能得到待处理的数据的信息,比如要切片的数量(默认就是blocksize,每个切片会分配一个 maptask实例去运行),程序jar包,配置参数等。mapreduce application master会根据这些信息获得map task实例数量。然后向集群中申请响应数量的map task进程。

2、每个map task会利用InputFormat去读取分给自己的切片的数据信息,读取到的内容以(key,value)(没读一行调用一次map()方法)形式传到我们自己重写的map()方法里进行处理。

3、将map()方法中的输出结果传到输出收集器(缓存?OutputCollector)中。

4、当map task执行完后输出收集器会将信息写入到一个输出文件中,这个文件是分区(有几个reduce task就有几个区)的。

5、mapreduce application master监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)。

6、reduce task进程启动之后,根据mapreduce application master告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: