您的位置:首页 > 大数据 > Hadoop

MapReduce程序之二次排序与多次排序

2018-03-08 18:35 197 查看

[toc]

需求

有下面的数据:

cookieId    time    url
2   12:12:34    2_hao123
3   09:10:34    3_baidu
1   15:02:41    1_google
3   22:11:34    3_sougou
1   19:10:34    1_baidu
2   15:02:41    2_google
1   12:12:34    1_hao123
3   23:10:34    3_soso
2   05:02:41    2_google

假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志,排序后的结果如下:

---------------------------------
1      12:12:34        1_hao123
1      15:02:41        1_google
1      19:10:34        1_baidu
---------------------------------
2      05:02:41        2_google
2      12:12:34        2_hao123
2      15:02:41        2_google
---------------------------------
3      09:10:34        3_baidu
3      22:11:34        3_sougou
3      23:10:34        3_soso

要求使用MapReduce程序实现。

程序思路分析

Map函数:
/**
* Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据
* AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求
* 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的
*/

Reduce函数:
/**
* 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可
*/

所以为了进行多个数据的比较,我们需要自定义key来作为Map输出的key。

MapReduce程序

关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序。

SecondSortJob.java

package com.uplooking.bigdata.mr.secondsort;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.sort.SortJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
* MapReduce排序之二次排序
*/
public class SecondSortJob {

/**
* 驱动程序,使用工具类使用Job
* @param args
*/
public static void main(String[] args) throws Exception {
if (args == null || args.length < 2) {
System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
System.exit(-1);
}

Job job = MapReduceJobUtil.buildJob(new Configuration(),
SecondSortJob.class,
args[0],
TextInputFormat.class,
SecondSortMapper.class,
AccessLogWritable.class,
NullWritable.class,
new Path(args[1]),
TextOutputFormat.class,
SecondSortReducer.class,
AccessLogWritable.class,
NullWritable.class);

// ReduceTask必须设置为1
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}

/**
* Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据
* AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求
* 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的
*/
public static class SecondSortMapper extends Mapper<LongWritable, Text, AccessLogWritable, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 解析每一行
String[] fields = value.toString().split("\t");
if(fields == null || fields.length < 3) {
return;
}
String cookieId = fields[0];
String time = fields[1];
String url = fields[2];
// 构建AccessLogWritable对象
AccessLogWritable logLine = new AccessLogWritable(cookieId, time, url);
// 写出到context
context.write(logLine, NullWritable.get());
}
}

/**
* 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可
*/
public static class SecondSortReducer extends Reducer<AccessLogWritable, NullWritable, AccessLogWritable, NullWritable> {
@Override
protected void reduce(AccessLogWritable key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
}

AccessLogWritable.java

package com.uplooking.bigdata.mr.secondsort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* 自定义Hadoop数据类型,作为key,需要实现WritableComparable接口
* map中排序需要比较的对象为AccessLogWritable,所以泛型填写为AccessLogWritable
*/
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {

private String cookieId;
private String time;
private String url;

/**
* 空参构造方法,必须要有,否则会有下面的异常:
Caused by: java.lang.NoSuchMethodException: com.uplooking.bigdata.mr.secondsort.AccessLogWritable.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
... 16 more
*/
public AccessLogWritable() {

}

public AccessLogWritable(String cookieId, String time, String url) {
this.cookieId = cookieId;
this.time = time;
this.url = url;
}

/**
* 比较的方法,定义的规则为:
* 先按 cookieId 排序,然后按 time 排序
* @param o
* @return
*/
public int compareTo(AccessLogWritable o) {
int ret = this.cookieId.compareTo(o.cookieId);
// 如果cookieId比较结果相同,再比较time
if(ret == 0) {
ret = this.time.compareTo(o.time);
}
return ret;
}

public void write(DataOutput out) throws IOException {
out.writeUTF(cookieId);
out.writeUTF(time);
out.writeUTF(url);
}

public void readFields(DataInput in) throws IOException {
this.cookieId = in.readUTF();
this.time = in.readUTF();
this.url = in.readUTF();
}

@Override
public String toString() {
return cookieId + "\t" + time + "\t" + url;
}
}

测试

这里使用本地环境来运行MapReduce程序,输入的参数如下:

/Users/yeyonghao/data/input/secondsort /Users/yeyonghao/data/output/mr/secondsort

也可以将其打包成jar包,然后上传到Hadoop环境中运行。

运行程序后,查看输出结果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/secondsort$ cat part-r-00000
1   12:12:34    1_hao123
1   15:02:41    1_google
1   19:10:34    1_baidu
2   05:02:41    2_google
2   12:12:34    2_hao123
2   15:02:41    2_google
3   09:10:34    3_baidu
3   22:11:34    3_sougou
3   23:10:34    3_soso

可以看到,通过使用自定义的key,我们的MapReduce程序已经完成了二次排序的功能。

扩展:如何实现多次排序

其实如果上面的程序能够理解清楚的话,多次排序的思路应该也是很自然就可以想到的,因为比较的规则其实是在key中定义的,而对于Map来说,是依据key来进行排序的,所以如果需要进行多次排序,我们就可以在自定义的key的compareTo方法中来实现多次排序的规则,有兴趣的朋友可以自行写出这样的程序,这里就不再说明。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 Hadoop MapReduce