mapreduce程序实现排序
2014-04-26 22:12
369 查看
文件的内容如下所示:
5
45
8
876
6
45
要求最后的输出格式:
1 5
2 6
3 8
4 45
5 45
5 876
首先,这个题目是需要对文件的内容进行排序操作。我们都知道在mapper阶段是会对key进行排序的,我们就利用这个出发,把输入一行的数据转换成int,再把该int做mapper的key输出,而value的输出随便,我们这里输出1;然后在reduce阶段我们把mapper的key做为reduce的value输出,而key只需定义一个全局的静态变量,每次输出自增即可。
package cn.lmj.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class Sort
{
public static class SortMapper extends MapReduceBase implements
Mapper<Object, Text, IntWritable, IntWritable>
{
@Override
public void map(Object key, Text value,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException
{
String line = value.toString();
int i = Integer.parseInt(line.toString());
output.collect(new IntWritable(i), new IntWritable(1));
}
}
public static class SortReducer extends MapReduceBase implements
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
{
//必须是全局的静态变量,因为reduce的实例在开发中可能会有很多个,必须让多个对象共享同一个变量
private static IntWritable linenum = new IntWritable(1);
@Override
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException
{
while (values.hasNext())
{
values.next();
output.collect(linenum, key);
//每次输出让linenum加1
linenum = new IntWritable(linenum.get() + 1);
}
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(Sort.class);
conf.setJobName("cccccc");
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(SortMapper.class);
//注意,这个题目不可以设置Combiner对mapper之后的数据进行预先合拼
conf.setReducerClass(SortReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("/zuoye/file1/"));
FileOutputFormat.setOutputPath(conf, new Path("/zuoye/file1/output"));
JobClient.runJob(conf);
}
}
5
45
8
876
6
45
要求最后的输出格式:
1 5
2 6
3 8
4 45
5 45
5 876
首先,这个题目是需要对文件的内容进行排序操作。我们都知道在mapper阶段是会对key进行排序的,我们就利用这个出发,把输入一行的数据转换成int,再把该int做mapper的key输出,而value的输出随便,我们这里输出1;然后在reduce阶段我们把mapper的key做为reduce的value输出,而key只需定义一个全局的静态变量,每次输出自增即可。
package cn.lmj.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class Sort
{
public static class SortMapper extends MapReduceBase implements
Mapper<Object, Text, IntWritable, IntWritable>
{
@Override
public void map(Object key, Text value,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException
{
String line = value.toString();
int i = Integer.parseInt(line.toString());
output.collect(new IntWritable(i), new IntWritable(1));
}
}
public static class SortReducer extends MapReduceBase implements
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
{
//必须是全局的静态变量,因为reduce的实例在开发中可能会有很多个,必须让多个对象共享同一个变量
private static IntWritable linenum = new IntWritable(1);
@Override
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException
{
while (values.hasNext())
{
values.next();
output.collect(linenum, key);
//每次输出让linenum加1
linenum = new IntWritable(linenum.get() + 1);
}
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = new JobConf(Sort.class);
conf.setJobName("cccccc");
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(SortMapper.class);
//注意,这个题目不可以设置Combiner对mapper之后的数据进行预先合拼
conf.setReducerClass(SortReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("/zuoye/file1/"));
FileOutputFormat.setOutputPath(conf, new Path("/zuoye/file1/output"));
JobClient.runJob(conf);
}
}
相关文章推荐
- mapreduce实现流量汇总排序程序
- C语言单向动态链表程序,实现链表的建立,合并,重新排序,链表元素的插入与删除,以及根据元素成员的值进行元素删除。
- mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
- 关于java实现的mapreduce程序打包后通过脚本运行出现classnotfound异常
- 利用采样器实现mapreduce任务输出全排序
- ArrayList的练习 编写程序实现对Employee类的排序
- MapReduce实现排序功能
- hadoop平台使用python编写mapreduce排序小程序
- 2018-07-29期 MapReduce实现对字符串进行排序
- MapReduce功能实现二---排序
- 实现一个选择排序程序,排序整型数组
- 利用采样器实现mapreduce任务输出全排序
- [python]使用python实现Hadoop MapReduce程序:计算一组数据的均值和方差
- mapreduce实现对key的排序
- Merge Sort [Basic] C程序 实现排序功能
- 实现一个选择排序程序,排序整型数组
- Mapreduce---RandomSampler采样实现全排序
- 使用Python实现Hadoop MapReduce程序
- MapReduce程序之二次排序与多次排序
- MapReduce实现自定义二次排序