MapReduce程序之数据排序
2018-03-08 12:03
148 查看
[toc]
MapReduce程序之数据排序
需求
下面有三个文件:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file1.csv 2 32 654 32 15 756 65223 yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file2.csv 5956 22 650 92 yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file3.csv 26 54 6
使用MapReduce对其进行排序并输出。
分析思路
Map阶段分析: /** * 数据在Map之后会做sort(从内存缓冲区到磁盘的时候会做sort),所以Map操作只需要把数据直接写出即可,最后在本地做数据 * 合并的时候也是会有排序的,详细可以参考MapReduce的过程,但是需要注意的是,因为我们需要进行的是数字的排序, * 所以在Map输出时,key的类型应该是Int类型才能按照数字的方式进行排序,如果是Text文本的话,那么是按照字典顺序 * 来进行排序的(也就是先比较字符串中的第一个字符,如果相同再比较第二个字符,以此类推),而不是按照数字进行排序 */ Reduce阶段分析: /** * 需要注意的是,排序与其它MapReduce程序有所不同,最后在驱动程序设置ReduceTask时,必须要设置为1 * 这样才能把数据都汇总到一起,另外一点,数据在shuffle到达reducer的时候,从内存缓冲区写到磁盘时 * 也会进行排序操作,所以即便是从不同节点上的Map上shuffle来的数据,到输入到reducer时,数据也是有序的, * 所以Reducer需要做的是把数据直接写到context中就可以了 */
MapReduce程序
关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序,程序代码如下:
package com.uplooking.bigdata.mr.sort; import com.uplooking.bigdata.common.utils.MapReduceJobUtil; import com.uplooking.bigdata.mr.duplication.DuplicationJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class SortJob { /** * 驱动程序,使用工具类来生成job * * @param args */ public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>"); System.exit(-1); } Job job = MapReduceJobUtil.buildJob(new Configuration(), SortJob.class, args[0], TextInputFormat.class, SortMapper.class, IntWritable.class, NullWritable.class, new Path(args[1]), TextOutputFormat.class, SortReducer.class, IntWritable.class, NullWritable.class); // ReduceTask必须设置为1 job.setNumReduceTasks(1); job.waitForCompletion(true); } /** * 数据在Map之后会做sort(从内存缓冲区到磁盘的时候会做sort),所以Map操作只需要把数据直接写出即可,最后在本地做数据 * 合并的时候也是会有排序的,详细可以参考MapReduce的过程,但是需要注意的是,因为我们需要进行的是数字的排序, * 所以在Map输出时,key的类型应该是Int类型才能按照数字的方式进行排序,如果是Text文本的话,那么是按照字典顺序 * 来进行排序的(也就是先比较字符串中的第一个字符,如果相同再比较第二个字符,以此类推),而不是按照数字进行排序 */ public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 先将value转换为数字 int num = Integer.valueOf(value.toString()); // 直接写出数据到context中 context.write(new IntWritable(num), NullWritable.get()); } } /** * 需要注意的是,排序与其它MapReduce程序有所不同,最后在驱动程序设置ReduceTask时,必须要设置为1 * 这样才能把数据都汇总到一起,另外一点,数据在shuffle到达reducer的时候,从内存缓冲区写到磁盘时 * 也会进行排序操作,所以即便是从不同节点上的Map上shuffle来的数据,到输入到reducer时,数据也是有序的, * 所以Reducer需要做的是把数据直接写到context中就可以了 */ public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> { @Override protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 直接将数据写入到context中 context.write(key, NullWritable.get()); } } /** * 仍然需要说明的是,因为reduce端在shuffle数据写入到磁盘的时候已经完成了排序, * 而这个排序的操作不是在reducer的输出中完成的,这也就意味着,reducer的输出数据中的key数据类型, * 可以是IntWritable,显然也可以设置为Text的,说明这个问题主要是要理清map-shuffle-reduce的过程 */ }
测试
这里使用本地环境来运行MapReduce程序,输入的参数如下:
/Users/yeyonghao/data/input/sort /Users/yeyonghao/data/output/mr/sort
也可以将其打包成jar包,然后上传到Hadoop环境中运行。
运行程序后,查看输出结果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/sort$ cat part-r-00000 2 6 15 22 26 32 54 92 650 654 756 5956 65223
可以看到,我们的MapReduce已经完成了数据排序的操作。
注意事项
因为在map输出后,相同的key会被shuffle到同一个reducer中,所以这个过程其实也完成了去重的操作,这也就意味着,按照上面的MapReduce程序的思路,重复的数据也会被删除,那么如何解决这个问题呢?大家可以思考一下。
思路也比较简单,可以这样做,map输出的时候,key还是原来的key,而value不再是NullWritalbe,而是跟key一样的,这样到了reducer的时候,如果有相同的数据,输入的数据就类似于<32, [32, 32, 32]>,那么在reducer输出数据的时候,就可以迭代[32, 32, 32]数据进行输出,这样就可以避免shuffle阶段key去重所带来去除了相同数字的问题。
阅读更多注意,前面一些文章,很多操作多次提到是在shuffle,其实有些是不准备的,包括这里的。在Map端,其实key的去重是在merge on disk的过程中完成了。
相关文章推荐
- MapReduce程序之数据排序
- 复杂mapreduce程序 分析http服务器数据
- Hadoop-MapReduce-TeraSort-大数据排序例子
- 选择排序法 当数据量较小的时候,使用基本排序方案并不会显著影响程序性能。 选择排序是十分常用的基本排序方案之一。
- 基于Hbase数据的Mapreduce程序环境开发
- MapReduce端的二次排序以及对移动计算而不是移动数据的理解
- hadoop实战学习之用MapReduce简单对整形数据进行全局排序
- Hadoop MapReduce数据去重程序
- 大数据——从零开始写MapReduce程序
- Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气
- Hadoop—MapReduce练习(数据去重、数据排序、平均成绩、倒排索引)
- 我的hadoop初学程序------简单数据排序-------Sort
- mapreduce(JAVA)实现(大数据)电话号码对应的流量排序(倒序)
- 实现一个简单的菜单程序,运行时显示"Menu:A(dd) D(elete) S(ort) Q(uit),Select one: "提示用户输入。输入A、D、S时分别提示"数据已经增加、删除、排序"
- MapReduce数据去重程序实验
- 学习笔记:从0开始学习大数据-8.直接在Eclipse配置运行MapReduce程序
- Hadoop 中文编码相关问题 -- mapreduce程序处理GBK编码数据并输出GBK编码数据
- hadoop平台使用python编写mapreduce排序小程序
- mapreduce实现排序并且找出销量最多的数据
- 编写mapreduce统计数据流量的小程序