您的位置:首页 > 运维架构

Hadoop 学习笔记 (十) MapReduce实现排序 全局变量

2014-03-18 16:58 627 查看
一些疑问:
1全排序的话,最后的应该sortJob.setNumReduceTasks(1);
2如果多个reducetask都去修改一个静态的IntWritable,IntWritable会乱序吧~
输入数据:
file1
2
32
654
32
15
756
65223
file2
5956
22
650
92
file3
26
54
6

importjava.io.IOException;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclassMySort{

publicstaticclassIntSortMapperextendsMapper<Object,Text,IntWritable,NullWritable>{

privateIntWritableval=newIntWritable();

publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
Stringline=value.toString().trim();
val.set(Integer.parseInt(line));
context.write(val,NullWritable.get());
}
}

publicstaticclassIntSortReducerextendsReducer<IntWritable,NullWritable,IntWritable,IntWritable>{
privateIntWritablek=newIntWritable();
publicvoidreduce(IntWritablekey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{
k.set(1);
for(NullWritablevalue:values){
context.write(k,key);
}
}
}

publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
Stringdir_in="hdfs://localhost:9000/in_sort";
Stringdir_out="hdfs://localhost:9000/out_sort";

Pathin=newPath(dir_in);
Pathout=newPath(dir_out);

Configurationconf=newConfiguration();
JobsortJob=newJob(conf,"my_sort");

sortJob.setJarByClass(MySort.class);

sortJob.setInputFormatClass(TextInputFormat.class);
sortJob.setMapperClass(IntSortMapper.class);
//sortJob.setCombinerClass(SortReducer.class);
//countJob.setPartitionerClass(HashPartitioner.class);
sortJob.setMapOutputKeyClass(IntWritable.class);
sortJob.setMapOutputValueClass(NullWritable.class);

FileInputFormat.addInputPath(sortJob,in);

sortJob.setReducerClass(IntSortReducer.class);
sortJob.setNumReduceTasks(1);
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(IntWritable.class);
//countJob.setOutputFormatClass(SequenceFileOutputFormat.class);

FileOutputFormat.setOutputPath(sortJob,out);

sortJob.waitForCompletion(true);

}

}
结果:
12
16
115
122
126
132
132
154
192
1650
1654
1756
15956
165223
修改reduce函数(不是用Iterable)
publicstaticclassIntSortReducerextendsReducer<IntWritable,NullWritable,IntWritable,IntWritable>{
privateIntWritablek=newIntWritable();
publicvoidreduce(IntWritablekey,NullWritablevalue,Contextcontext)throwsIOException,InterruptedException{
k.set(1);
//for(NullWritablevalue:values){
context.write(k,key);
//}
}
}
结果:(不是很理解,为啥去掉iterable后就只输出一个valuekey哪去了呢)
2
6
15
22
26
32
32
54
92
650
654
756
5956
65223
importjava.io.IOException;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclassMySort{

publicstaticclassIntSortMapperextendsMapper<Object,Text,IntWritable,NullWritable>{

privateIntWritableval=newIntWritable();

publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
Stringline=value.toString().trim();
val.set(Integer.parseInt(line));
context.write(val,NullWritable.get());
}
}

publicstaticclassIntSortReducerextendsReducer<IntWritable,NullWritable,IntWritable,IntWritable>{
privatestaticIntWritablenum=newIntWritable(1);
publicvoidreduce(IntWritablekey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{

for(NullWritablevalue:values){
context.write(num,key);
num=newIntWritable(num.get()+1);
}
}
}

publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
Stringdir_in="hdfs://localhost:9000/in_sort";
Stringdir_out="hdfs://localhost:9000/out_sort";

Pathin=newPath(dir_in);
Pathout=newPath(dir_out);

Configurationconf=newConfiguration();
JobsortJob=newJob(conf,"my_sort");

sortJob.setJarByClass(MySort.class);

sortJob.setInputFormatClass(TextInputFormat.class);
sortJob.setMapperClass(IntSortMapper.class);
//sortJob.setCombinerClass(SortReducer.class);
//countJob.setPartitionerClass(HashPartitioner.class);
sortJob.setMapOutputKeyClass(IntWritable.class);
sortJob.setMapOutputValueClass(NullWritable.class);

FileInputFormat.addInputPath(sortJob,in);

sortJob.setReducerClass(IntSortReducer.class);
sortJob.setNumReduceTasks(1);
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(IntWritable.class);
//countJob.setOutputFormatClass(SequenceFileOutputFormat.class);

FileOutputFormat.setOutputPath(sortJob,out);

sortJob.waitForCompletion(true);

}

}
12
26
315
422
526
632
732
854
992
10650
11654
12756
135956
1465223

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: