MapReduce程序之二次排序与多次排序
2018-03-08 18:35
197 查看
[toc]
需求
有下面的数据:
cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:02:41 1_google 3 22:11:34 3_sougou 1 19:10:34 1_baidu 2 15:02:41 2_google 1 12:12:34 1_hao123 3 23:10:34 3_soso 2 05:02:41 2_google
假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志,排序后的结果如下:
--------------------------------- 1 12:12:34 1_hao123 1 15:02:41 1_google 1 19:10:34 1_baidu --------------------------------- 2 05:02:41 2_google 2 12:12:34 2_hao123 2 15:02:41 2_google --------------------------------- 3 09:10:34 3_baidu 3 22:11:34 3_sougou 3 23:10:34 3_soso
要求使用MapReduce程序实现。
程序思路分析
Map函数: /** * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据 * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求 * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的 */ Reduce函数: /** * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可 */
所以为了进行多个数据的比较,我们需要自定义key来作为Map输出的key。
MapReduce程序
关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序。
SecondSortJob.java
package com.uplooking.bigdata.mr.secondsort; import com.uplooking.bigdata.common.utils.MapReduceJobUtil; import com.uplooking.bigdata.mr.sort.SortJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * MapReduce排序之二次排序 */ public class SecondSortJob { /** * 驱动程序,使用工具类使用Job * @param args */ public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>"); System.exit(-1); } Job job = MapReduceJobUtil.buildJob(new Configuration(), SecondSortJob.class, args[0], TextInputFormat.class, SecondSortMapper.class, AccessLogWritable.class, NullWritable.class, new Path(args[1]), TextOutputFormat.class, SecondSortReducer.class, AccessLogWritable.class, NullWritable.class); // ReduceTask必须设置为1 job.setNumReduceTasks(1); job.waitForCompletion(true); } /** * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据 * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求 * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的 */ public static class SecondSortMapper extends Mapper<LongWritable, Text, AccessLogWritable, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 解析每一行 String[] fields = value.toString().split("\t"); if(fields == null || fields.length < 3) { return; } String cookieId = fields[0]; String time = fields[1]; String url = fields[2]; // 构建AccessLogWritable对象 AccessLogWritable logLine = new AccessLogWritable(cookieId, time, url); // 写出到context context.write(logLine, NullWritable.get()); } } /** * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可 */ public static class SecondSortReducer extends Reducer<AccessLogWritable, NullWritable, AccessLogWritable, NullWritable> { @Override protected void reduce(AccessLogWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
AccessLogWritable.java
package com.uplooking.bigdata.mr.secondsort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 自定义Hadoop数据类型,作为key,需要实现WritableComparable接口 * map中排序需要比较的对象为AccessLogWritable,所以泛型填写为AccessLogWritable */ public class AccessLogWritable implements WritableComparable<AccessLogWritable> { private String cookieId; private String time; private String url; /** * 空参构造方法,必须要有,否则会有下面的异常: Caused by: java.lang.NoSuchMethodException: com.uplooking.bigdata.mr.secondsort.AccessLogWritable.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125) ... 16 more */ public AccessLogWritable() { } public AccessLogWritable(String cookieId, String time, String url) { this.cookieId = cookieId; this.time = time; this.url = url; } /** * 比较的方法,定义的规则为: * 先按 cookieId 排序,然后按 time 排序 * @param o * @return */ public int compareTo(AccessLogWritable o) { int ret = this.cookieId.compareTo(o.cookieId); // 如果cookieId比较结果相同,再比较time if(ret == 0) { ret = this.time.compareTo(o.time); } return ret; } public void write(DataOutput out) throws IOException { out.writeUTF(cookieId); out.writeUTF(time); out.writeUTF(url); } public void readFields(DataInput in) throws IOException { this.cookieId = in.readUTF(); this.time = in.readUTF(); this.url = in.readUTF(); } @Override public String toString() { return cookieId + "\t" + time + "\t" + url; } }
测试
这里使用本地环境来运行MapReduce程序,输入的参数如下:
/Users/yeyonghao/data/input/secondsort /Users/yeyonghao/data/output/mr/secondsort
也可以将其打包成jar包,然后上传到Hadoop环境中运行。
运行程序后,查看输出结果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/secondsort$ cat part-r-00000 1 12:12:34 1_hao123 1 15:02:41 1_google 1 19:10:34 1_baidu 2 05:02:41 2_google 2 12:12:34 2_hao123 2 15:02:41 2_google 3 09:10:34 3_baidu 3 22:11:34 3_sougou 3 23:10:34 3_soso
可以看到,通过使用自定义的key,我们的MapReduce程序已经完成了二次排序的功能。
扩展:如何实现多次排序
其实如果上面的程序能够理解清楚的话,多次排序的思路应该也是很自然就可以想到的,因为比较的规则其实是在key中定义的,而对于Map来说,是依据key来进行排序的,所以如果需要进行多次排序,我们就可以在自定义的key的compareTo方法中来实现多次排序的规则,有兴趣的朋友可以自行写出这样的程序,这里就不再说明。
相关文章推荐
- 【Hadoop】MapReduce温度排序之二次排序
- MapReduce之二次排序
- mapreduce实现流量汇总排序程序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- mapreduce算法之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce排序程序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- mapreduce程序实现排序
- MapReduce之二次排序
- MapReduce之二次排序