您的位置:首页 > 其它

mapreduce程序实现排序

2014-04-26 22:12 369 查看
文件的内容如下所示:

5

45

8

876

6

45

要求最后的输出格式:

1 5

2 6

3 8

4 45

5 45

5 876

首先,这个题目是需要对文件的内容进行排序操作。我们都知道在mapper阶段是会对key进行排序的,我们就利用这个出发,把输入一行的数据转换成int,再把该int做mapper的key输出,而value的输出随便,我们这里输出1;然后在reduce阶段我们把mapper的key做为reduce的value输出,而key只需定义一个全局的静态变量,每次输出自增即可。

package cn.lmj.mapreduce;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class Sort

{

public static class SortMapper extends MapReduceBase implements

Mapper<Object, Text, IntWritable, IntWritable>

{

@Override

public void map(Object key, Text value,

OutputCollector<IntWritable, IntWritable> output,

Reporter reporter) throws IOException

{

String line = value.toString();

int i = Integer.parseInt(line.toString());

output.collect(new IntWritable(i), new IntWritable(1));

}

}

public static class SortReducer extends MapReduceBase implements

Reducer<IntWritable, IntWritable, IntWritable, IntWritable>

{

//必须是全局的静态变量,因为reduce的实例在开发中可能会有很多个,必须让多个对象共享同一个变量

private static IntWritable linenum = new IntWritable(1);

@Override

public void reduce(IntWritable key, Iterator<IntWritable> values,

OutputCollector<IntWritable, IntWritable> output,

Reporter reporter) throws IOException

{

while (values.hasNext())

{

values.next();

output.collect(linenum, key);

//每次输出让linenum加1

linenum = new IntWritable(linenum.get() + 1);

}

}

}

public static void main(String[] args) throws Exception

{

JobConf conf = new JobConf(Sort.class);

conf.setJobName("cccccc");

conf.setOutputKeyClass(IntWritable.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(SortMapper.class);

//注意,这个题目不可以设置Combiner对mapper之后的数据进行预先合拼

conf.setReducerClass(SortReducer.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path("/zuoye/file1/"));

FileOutputFormat.setOutputPath(conf, new Path("/zuoye/file1/output"));

JobClient.runJob(conf);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: