MapReduce编程模型及实现WordCount
2017-11-05 16:20
645 查看
思想
分而治之map:对每一部分数据进行处理
reduce:合并
编程模型
一种分布式计算模型,解决海量数据的计算问题MapReduce将整个并行计算过程抽象到两个函数中
map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度合并。
reduce(化简):对一个列表的元素进行合并
一个简单的MapReduce程序只需要指定map(),reduce(),input和output,剩下的事情由框架完成。
数据传输
input(key-value) ->map(key-value) ->reduce(key-value) ->output(key-value)整个过程数据传输的过程都是通过key-value形式
编写WordCount程序
首先MapReduce中,map和reduce函数遵循如下常规格式:map:(k1, V1) -> list:(k2, V2)
reduce:(k2,list( V2)) -> list:(k3, V3)
Mapper的基类:
//继承Mapper类,并重写以下方法 protected void map(KEY inKey, VALUE inValue, Context context) throws IOException, InterruptedException {}
Reducer的基类
//继承Reducer类,并重写以下方法 protected void reduce(KEY inKey, Iterable<VALUE> inValues, Context context) throws IOException, InterruptedException {}
分析WordCount执行过程
1、由于在mapreduce程序中,数据都是以key-value形式进行传输,在读取文件中的数据的时候,会将文件中的数据转换成key-value的形式:
如上图所示,最初是将文件中的数据以每一行的数据位value,以每一行的偏移量为key进行存储的,即:<行偏移量, 行数据>,即map方法接受的参数为inKey = 行偏移量, inValue = 行数据。
2、从map中输出的数据也是key-value形式。在map中得到<行偏移量, 行数据>这样的数据后,将数据以行通过”空格”进行分割,将分割后的字符串数组直接遍历输出,此时的key是单词字符串,value为单词的数量(此时输出均为1)。map输出的数据如下所示:
3、map过程结束后,mapreduce框架将进行shuffle过程,此过程将进行分组,将相同key的value合并在一起,放到一个集合中,此时shuffle过程会默认按照key进行排序,将合并并排序后的key-values传给reduce方法,传给reduce方法的数据如下:
4、reducer得到数据后,只需要将每个key对应的values进行统计输出即可。
具体代码实现:
public class WordCount { /* step1 :map class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text mapOutputKey = new Text(); private static final IntWritable mapOutputValue = new IntWritable(1); @Override public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); //String[] strs = line.split(" ");不使用此方法,由于此方法太消耗内存,而数据量较大 StringTokenizer stringTokenizer = new StringTokenizer(line); while(stringTokenizer.hasMoreTokens()){ String wordValue = stringTokenizer.nextToken(); //设置值 mapOutputKey.set(wordValue); //输出 context.write(mapOutputKey, mapOutputValue); } } } //step2 :reduce class public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //(key values) = (key, list(1, 1, 1)) int sum = 0; for(IntWritable value : values){ sum += value.get(); } outputValue.set(sum); context.write(key, outputValue); } } //step3 :driver public int run(String[] arg0) throws Exception{ //读取配置文件信息 Configuration conf = new Configuration(); //创建Job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); //设置运行的jar job.setJarByClass(this.getClass()); //设置input Path inpath = new Path(arg0[0]); FileInputFormat.addInputPath(job, inpath); //设置map job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置output Path outpath = new Path(arg0[1]); FileOutputFormat.setOutputPath(job, outpath); //提交job,设置为true会在运行的时候打印日志信息 boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { int status = new WordCount().run(args); System.exit(status); } }
测试程序
1、将程序打成jar包2、执行
bin/yarn jar jars/hadooptest.jar com.bxp.hadooptest.mapreduce.WordCount /user/bxp/mapreduce/wordcount/input /user/bxp/mapreduce/wordcount/output3
3、查看程序运行结果
bin/hdfs dfs -cat /user/bxp/mapreduce/wordcount/output1/part-r-00000
MapReduce程序标准写法
与上面代码区别:1、extends Configured implements Tool 实现Tool接口的run()方法
2、在main方法中创建Configuration对象
3、在run()方法中通过Configured的get()方法获取main()中创建的Configuration实例对象。
具体代码如下:
public class WordCountMapReduce extends Configured implements Tool{ /* step1 :map class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text mapOutputKey = new Text(); private static final IntWritable mapOutputValue = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //String[] strs = line.split(" ");不使用此方法,由于此方法太消耗内存,而数据量较大 StringTokenizer stringTokenizer = new StringTokenizer(line); while(stringTokenizer.hasMoreTokens()){ String wordValue = stringTokenizer.nextToken(); //设置值 mapOutputKey.set(wordValue); //输出,此处的输出结果通过shuffle过程后传入reduce context.write(mapOutputKey, mapOutputValue); } } } //step2 :reduce class public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //(key values) = (key, list(1, 1, 1)) int sum = 0; for(IntWritable value : values){ sum += value.get(); } outputValue.set(sum); context.write(key, outputValue); } } //step3 :driver public int run(String[] arg0) throws Exception{ // Configuration configuration = new Configuration(); //读取配置文件信息 Configuration conf = getConf(); //创建Job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); //设置运行的jar job.setJarByClass(this.getClass()); //设置input Path inpath = new Path(arg0[0]); FileInputFormat.addInputPath(job, inpath); //设置map job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置output Path outpath = new Path(arg0[1]); FileOutputFormat.setOutputPath(job, outpath); //提交job,设置为true会在运行的时候打印日志信息 boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); // int status= new WordCountMapReduce().run(args); int status = ToolRunner.run(configuration, new WordCountMapReduce(), args); System.exit(status); } }
MapReduce程序模版
将具体的业务逻辑留给用户实现,将程序抽取成以下模版,写MapReduce程序时,只需要在//TODO填写具体的业务逻辑即可:public class MapReduceModel extends Configured implements Tool{ //TODO map的输出类型 public static class ModelMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //TODO 具体的map的业务逻辑 } } //step2 :reduce class //TODO reduce的输入输出类型 public static class ModelReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //TODO 具体的reduce的业务逻辑 } } //step3 :driver public int run(String[] arg0) throws Exception{ // Configuration configuration = new Configuration(); //读取配置文件信息 Configuration conf = getConf(); //创建Job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); //设置运行的jar job.setJarByClass(this.getClass()); //设置input Path inpath = new Path(arg0[0]); FileInputFormat.addInputPath(job, inpath); //设置map //TODO 需要修改ModelMapper job.setMapperClass(ModelMapper.class); //TODO 需要修改map的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce //TODO 需要修改ModelReduce job.setReducerClass(ModelReduce.class); //TODO 需要修改reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置output Path outpath = new Path(arg0[1]); FileOutputFormat.setOutputPath(job, outpath); //提交job,设置为true会在运行的时候打印日志信息 boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); // int status= new WordCountMapReduce().run(args); int status = ToolRunner.run(configuration, new MapReduceModel(), args); System.exit(status); } }
模版优化:
Mapper类和Reducer类中。可以去实现父类的setup()和cleanup()方法
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
此两个方法分别是在map和reduce方法执行之前和之后去执行,比如创建数据库的链接和链接的释放可以在这两个方法中执行。
相关文章推荐
- Hadoop集群_WordCount运行详解--MapReduce编程模型
- 一个wordcount程序轻松玩转MapReduce编程模型
- [转]Hadoop集群_WordCount运行详解--MapReduce编程模型
- 快学Scala-Actor并发编程实现WordCount
- 使用python实现MapReduce的wordcount实例
- mapreduce实现wordcount
- MapReduce编程实例之WordCount
- Hadoop Pipes编程之C++实现WordCount
- MapReduce自带wordcount的实现
- MapReduce实现WordCount以及常见问题解决
- MapReduce实现WordCount
- MapReduce WordCount编程实例
- MapReduce 编程实战之WordCount案例详细分析
- Hadoop MapReduce编程 API入门系列之wordcount版本4(八)
- MapReduce实现WordCount, 及其优化
- Kettle实现MapReduce之WordCount
- Mapreduce编程1之WordCount
- Hadoop-MapReduce之WordCount的实现
- Hadoop MapReduce编程 API入门系列之wordcount版本2(六)
- MapReduce: WordCount的Eclipse实现