您的位置:首页 > 编程语言

MapReduce编程模型及实现WordCount

2017-11-05 16:20 645 查看

思想

分而治之

map:对每一部分数据进行处理

reduce:合并

编程模型

一种分布式计算模型,解决海量数据的计算问题

MapReduce将整个并行计算过程抽象到两个函数中

map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度合并。

reduce(化简):对一个列表的元素进行合并

一个简单的MapReduce程序只需要指定map(),reduce(),input和output,剩下的事情由框架完成。

数据传输

input(key-value) ->map(key-value) ->reduce(key-value) ->output(key-value)

整个过程数据传输的过程都是通过key-value形式

编写WordCount程序

首先MapReduce中,map和reduce函数遵循如下常规格式:

map:(k1, V1) -> list:(k2, V2)

reduce:(k2,list( V2)) -> list:(k3, V3)

Mapper的基类:

//继承Mapper类,并重写以下方法
protected void map(KEY inKey, VALUE inValue,
Context context) throws IOException, InterruptedException
{}


Reducer的基类

//继承Reducer类,并重写以下方法
protected void reduce(KEY inKey, Iterable<VALUE> inValues,
Context context) throws IOException, InterruptedException
{}


分析WordCount执行过程

1、由于在mapreduce程序中,数据都是以key-value形式进行传输,在读取文件中的数据的时候,会将文件中的数据转换成key-value的形式:



如上图所示,最初是将文件中的数据以每一行的数据位value,以每一行的偏移量为key进行存储的,即:<行偏移量, 行数据>,即map方法接受的参数为inKey = 行偏移量, inValue = 行数据。

2、从map中输出的数据也是key-value形式。在map中得到<行偏移量, 行数据>这样的数据后,将数据以行通过”空格”进行分割,将分割后的字符串数组直接遍历输出,此时的key是单词字符串,value为单词的数量(此时输出均为1)。map输出的数据如下所示:



3、map过程结束后,mapreduce框架将进行shuffle过程,此过程将进行分组,将相同key的value合并在一起,放到一个集合中,此时shuffle过程会默认按照key进行排序,将合并并排序后的key-values传给reduce方法,传给reduce方法的数据如下:



4、reducer得到数据后,只需要将每个key对应的values进行统计输出即可。



具体代码实现:

public class WordCount {
/*
step1 :map class
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapOutputKey = new Text();
private static final IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
//String[] strs = line.split(" ");不使用此方法,由于此方法太消耗内存,而数据量较大
StringTokenizer stringTokenizer = new StringTokenizer(line);
while(stringTokenizer.hasMoreTokens()){
String wordValue = stringTokenizer.nextToken();
//设置值
mapOutputKey.set(wordValue);
//输出
context.write(mapOutputKey, mapOutputValue);
}
}
}

//step2 :reduce class
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//(key values) = (key, list(1, 1, 1))
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}

//step3 :driver
public  int run(String[] arg0) throws Exception{
//读取配置文件信息
Configuration conf = new Configuration();
//创建Job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
//设置运行的jar
job.setJarByClass(this.getClass());

//设置input
Path inpath = new Path(arg0[0]);
FileInputFormat.addInputPath(job, inpath);

//设置map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置output
Path outpath = new Path(arg0[1]);
FileOutputFormat.setOutputPath(job, outpath);

//提交job,设置为true会在运行的时候打印日志信息
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int status = new WordCount().run(args);
System.exit(status);
}
}


测试程序

1、将程序打成jar包

2、执行

bin/yarn jar jars/hadooptest.jar com.bxp.hadooptest.mapreduce.WordCount  /user/bxp/mapreduce/wordcount/input /user/bxp/mapreduce/wordcount/output3


3、查看程序运行结果

bin/hdfs dfs -cat /user/bxp/mapreduce/wordcount/output1/part-r-00000


MapReduce程序标准写法

与上面代码区别:

1、extends Configured implements Tool 实现Tool接口的run()方法

2、在main方法中创建Configuration对象

3、在run()方法中通过Configured的get()方法获取main()中创建的Configuration实例对象。

具体代码如下:

public class WordCountMapReduce extends Configured implements Tool{
/*
step1 :map class
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapOutputKey = new Text();
private static final IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//String[] strs = line.split(" ");不使用此方法,由于此方法太消耗内存,而数据量较大
StringTokenizer stringTokenizer = new StringTokenizer(line);
while(stringTokenizer.hasMoreTokens()){
String wordValue = stringTokenizer.nextToken();
//设置值
mapOutputKey.set(wordValue);
//输出,此处的输出结果通过shuffle过程后传入reduce
context.write(mapOutputKey, mapOutputValue);
}
}
}

//step2 :reduce class
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//(key values) = (key, list(1, 1, 1))
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}

//step3 :driver
public  int run(String[] arg0) throws Exception{
//        Configuration configuration = new Configuration();
//读取配置文件信息
Configuration conf = getConf();
//创建Job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
//设置运行的jar
job.setJarByClass(this.getClass());

//设置input
Path inpath = new Path(arg0[0]);
FileInputFormat.addInputPath(job, inpath);

//设置map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置output
Path outpath = new Path(arg0[1]);
FileOutputFormat.setOutputPath(job, outpath);

//提交job,设置为true会在运行的时候打印日志信息
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//        int status= new WordCountMapReduce().run(args);
int status = ToolRunner.run(configuration, new WordCountMapReduce(), args);

System.exit(status);
}
}


MapReduce程序模版

将具体的业务逻辑留给用户实现,将程序抽取成以下模版,写MapReduce程序时,只需要在//TODO填写具体的业务逻辑即可:

public class MapReduceModel extends Configured implements Tool{

//TODO  map的输出类型
public static class ModelMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//TODO     具体的map的业务逻辑

}
}

//step2 :reduce class
//TODO    reduce的输入输出类型
public static class ModelReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//TODO     具体的reduce的业务逻辑
}
}

//step3 :driver
public  int run(String[] arg0) throws Exception{
//        Configuration configuration = new Configuration();
//读取配置文件信息
Configuration conf = getConf();
//创建Job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
//设置运行的jar
job.setJarByClass(this.getClass());

//设置input
Path inpath = new Path(arg0[0]);
FileInputFormat.addInputPath(job, inpath);

//设置map
//TODO    需要修改ModelMapper
job.setMapperClass(ModelMapper.class);
//TODO    需要修改map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置reduce
//TODO    需要修改ModelReduce
job.setReducerClass(ModelReduce.class);
//TODO    需要修改reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置output
Path outpath = new Path(arg0[1]);
FileOutputFormat.setOutputPath(job, outpath);

//提交job,设置为true会在运行的时候打印日志信息
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//        int status= new WordCountMapReduce().run(args);
int status = ToolRunner.run(configuration, new MapReduceModel(), args);

System.exit(status);
}
}


模版优化:

Mapper类和Reducer类中。可以去实现父类的setup()和cleanup()方法

protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}


此两个方法分别是在map和reduce方法执行之前和之后去执行,比如创建数据库的链接和链接的释放可以在这两个方法中执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: