您的位置:首页 > 运维架构

Hadoop实现Secondary Sort (三)

2013-10-15 10:34 239 查看
一、背景

排序对于MR来说是个核心内容,如何做好排序十分的重要,这几天写了一些,总结一下,以供以后读阅。

二、准备

1、hadoop版本是0.20.2

2、输入的数据格式(这个很重要,看清楚格式),名称是secondary.txt:

[java] view plaincopyabc     123  
acb     124  
cbd     523  
abc     234  
nbc     563  
fds     235  
khi     234  
cbd     675  
fds     971  
hka     862  
ubd     621  
khi     123  
fds     321  

仔细看下,数据文件第一列是字母,第二列是数字,我要做的就是结合这组数据进行一些排序的测试。

3、代码框架,因为接下来的测试改动都是针对部分代码的修改,框架的代码是不会改变的,所以先把主要代码贴在这里。

代码分为2部分:自定义的key和主框架代码(注意看下红色部分)。先贴上主框架代码:

MyGrouping.java

[java] view
plaincopy

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.io.WritableComparator;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.Partitioner;  

import org.apache.hadoop.mapreduce.Reducer;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.util.GenericOptionsParser;  

  

import com.run.lenged.business.TextPair;  

  

public class MyGrouping {  

  

    /** 

     * Map 

     *  

     * @author Administrator 

     */  

    public static class MyGroupingMap extends Mapper<LongWritable, Text, TextPair, Text> {  

        protected void map(LongWritable key, Text value,  

                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, TextPair, Text>.Context context)  

                throws java.io.IOException, InterruptedException {  

            String arr[] = value.toString().split("/t");  

            if (arr.length != 2) {  

                return;  

            }  

            TextPair tp = new TextPair();  

            tp.set(new Text(arr[0]), new Text(arr[1]));  

            context.write(tp, new Text(arr[1]));  

        }  

    }  

  

    /** 

     * 按照Hashcode值来进行切分 

     *  

     * @author Administrator 

     */  

    public static class MyGroupingPartition extends Partitioner<TextPair, Text> {  

        @Override  

        public int getPartition(TextPair key, Text value, int numPartitions) {  

            return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;  

        }  

    }  

  

    /** 

     * group进行排序 

     *  

     * @author Administrator 

     */  

    @SuppressWarnings("unchecked")  

    public static class MyGroupingGroup extends WritableComparator {  

        //代码变动部分  

    }  

  

    /** 

     * reduce 

     *  

     * @author Administrator 

     */  

    public static class MyGroupingReduce extends Reducer<TextPair, Text, Text, Text> {  

        protected void reduce(TextPair key, java.lang.Iterable<Text> value,  

                org.apache.hadoop.mapreduce.Reducer<TextPair, Text, Text, Text>.Context context)  

                throws java.io.IOException, InterruptedException {  

            StringBuffer sb = new StringBuffer();  

            while (value.iterator().hasNext()) {  

                sb.append(value.iterator().next().toString() + "_");  

            }  

            context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));  

        }  

    }  

  

    public static void main(String args[]) throws Exception {  

        Configuration conf = new Configuration();  

        GenericOptionsParser parser = new GenericOptionsParser(conf, args);  

        String[] otherArgs = parser.getRemainingArgs();  

        if (args.length != 2) {  

            System.err.println("Usage: NewlyJoin <inpath> <output>");  

            System.exit(2);  

        }  

  

        Job job = new Job(conf, "MyGrouping");  

        // 设置运行的job  

        job.setJarByClass(MyGrouping.class);  

        // 设置Map相关内容  

        job.setMapperClass(MyGroupingMap.class);  

        job.setMapOutputKeyClass(TextPair.class);  

        job.setMapOutputValueClass(Text.class);  

        job.setPartitionerClass(MyGroupingPartition.class);  

          

        job.setGroupingComparatorClass(MyGroupingGroup.class);  

          

        // 设置reduce  

        job.setReducerClass(MyGroupingReduce.class);  

        job.setOutputKeyClass(Text.class);  

        job.setOutputValueClass(Text.class);  

        // 设置输入和输出的目录  

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  

        // 执行,直到结束就退出  

        System.exit(job.waitForCompletion(true) ? 0 : 1);  

    }  

}  

TextPair.java

[java] view
plaincopy

import java.io.DataInput;  

import java.io.DataOutput;  

import java.io.IOException;  

  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.io.WritableComparable;  

  

public class TextPair implements WritableComparable<TextPair> {  

  

    private Text first;  

    private Text second;  

  

    public TextPair() {  

        set(new Text(), new Text());  

    }  

  

    public void set(Text first, Text second) {  

        this.first = first;  

        this.second = second;  

    }  

  

    public Text getFirst() {  

        return first;  

    }  

  

    public Text getSecond() {  

        return second;  

    }  

  

    @Override  

    public void readFields(DataInput in) throws IOException {  

        first.readFields(in);  

        second.readFields(in);  

    }  

  

    @Override  

    public void write(DataOutput out) throws IOException {  

        first.write(out);  

        second.write(out);  

    }  

  

    @Override  

    public int compareTo(TextPair o) {  

        int cmp = first.compareTo(o.first);  

        if (cmp != 0) {  

            return cmp;  

        } else {  

            return second.compareTo(o.second);  

        }  

    }  

}  

三、测试前提

1、首先提一个需求,我们结合需求来测试,然后再扩散开。
需求内容是:如果第一列值相同,第二列值叠加,并对第二列值进行升序排序。最后输出的时候,按照第一列值的升序排序输出。
2、需求实现。
根据上面的需求,我们可以分析一下:
需要对第一个字段和第二个字段都进行排序,那么单纯的利用MR框架对key迭代输出,value累加是不行的。因为value是没有进行排序。
所以我们需要做一些改动,定义key为符合组建。TextPair.java类就是自定义的key。
一般来说如果要对key和value同时做排序,那么,自定义的组合key的格式第一个值是第一个字段,第二个值就是第二个字段。
3、那么我们就定义一个job.setGroupingComparatorClass(MyGroupingGroup.class);代码如下:

[javascript] view
plaincopy

public static class MyGroupingGroup extends WritableComparator {  

        public int compare(WritableComparable a, WritableComparable b) {  

            return mip1.getFirst().compareTo(mip2.getFirst());  

    }  

  

  

        protected MyGroupingGroup() {  

            super(TextPair.class, true);  

        }  

  

        @Override  

            TextPair mip1 = (TextPair) a;  

            TextPair mip2 = (TextPair) b;  

        }  

只对输出的复合组建第一项值进行排序。输出的结果如下:

[java] view
plaincopy

abc 123_234  

cbd 523_675  

khi 123_234  

ubd 621  

nbc 563  

acb 124  

fds 235_321_971  

hka 862  

4、查看结果,我们可以看出,基本满足了上面的需求。那么接下来,我们就将做个测试,来实现一下MR的排序功能。

四、Group按第二个字段值进行排序测试

1、修改一下group的排序方式,针对第二个值进行合并排序,代码如下:

[java] view
plaincopy

public static class MyGroupingGroup extends WritableComparator {  

        protected MyGroupingGroup() {  

            super(TextPair.class, true);  

        }  

  

        @Override  

        public int compare(WritableComparable a, WritableComparable b) {  

            TextPair mip1 = (TextPair) a;  

            TextPair mip2 = (TextPair) b;  

            return mip1.getSecond().compareTo(mip2.getSecond());  

            //return mip1.getFirst().compareTo(mip2.getFirst());  

        }  

    }  

2、reduce的输出稍微改下,将第2个字段也输出,方便查看,代码如下:

[java] view
plaincopy

context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));  

reduce输出的结果:

[html] view
plaincopy

abc_123 123  

abc_234 234  

acb_124 124  

cbd_523 523  

cbd_675 675  

fds_235 235  

fds_321 321  

fds_971 971  

hka_862 862  

khi_123 123  

khi_234 234  

nbc_563 563  

ubd_621 621  

3、看到结果,第一反应就是没有按照我的要求,按第二个值进行排序操作。

其实不是,这个结果确实是进行了group的排序,只是说遇到没有符合合并结果数据。所以,看起来没有进行排序。

在这里有个概念,就是group到底是在什么时候做的排序,原文是这样写的:

Job.setGroupingComparatorClass(Class<? extends RawComparator> cls)  
Define the comparator that controls which keys are grouped together 

for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

我尝试翻译了一下(英文水平实在是有限,不对的地方还望各位指出):

在一个reduce的调用过程中,定义一个comparator,对分组在一起的key进行排序。

通过上面这句话就可以理解,为什么khi_123 123和abc_123 123没有叠加在一起。

五、总结

1、这里只写了group的排序,没有写sort,后面将会写一个,说不定就是今天晚上吧!

2、过几天写个MR的执行流程,并画个图,贴出来大家看看。

3、对于这块的排序我也是接触不久,可能有写的不对的地方。还望朋友们跟贴指出来。

4、如果有疑问或是不好跟贴,可以发邮件交流:dajuezhao@gmail.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息