您的位置:首页 > 其它

mapreduce程序的压缩实现

2015-04-22 18:20 183 查看
案例是按照权威指南第二版的MaxTemperatureWithCompression进行的。

输入:

当压缩文件做为mapreduce的输入时,mapreduce将自动通过扩展名找到相应的codec对其解压。

输出:

当mapreduce的输出文件需要压缩时,

1.可以更改mapred.output.compress为true,mapped.output.compression.codec为想要使用的codec的类名就可以了,

2.可以在代码中指定,通过调用FileOutputFormat的静态方法去设置这两个属性,我们来看代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MaxTemperatureWithCompression {
public static class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
// System.out.println(year);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}

String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

public static class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}

public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Usage: MaxTemperatureWithCompression <input path> <output path>");
System.exit(-1);
}
Job job = new Job(conf);
job.setJarByClass(MaxTemperatureWithCompression.class);
job.setJobName("MaxTemperatureWithCompression");
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(
otherArgs[otherArgs.length - 1]));
// 设置输出的压缩格式
FileOutputFormat.setCompressOutput(job, true);

FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
</pre><div>或者按照第一种方式进行设置:</div><div><pre name="code" class="java">Configuration conf = new Configuration();
conf.setBoolean("mapred.out.compress", true);
conf.setClass("mapred.output.compression.codec",GzipCodec.class, CompressionCodec.class);
Job job=new Job(conf);
同时删除对FileOutputFormat的压缩设置。

如果你要将序列文件做为输出,你需要设置mapred.output.compression.type属性来指定压缩类型,默认是RECORD类型,它会按单个的record压缩,如果指定为BLOCK类型,它将一组record压缩,压缩效果自然是BLOCK好。
当然代码里也可以设置,你只需调用SequenceFileOutputFormat的setOutputCompressionType方法进行设置。
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
对map任务输出进行压缩:

即使你的mapreduce的输入输出都是未压缩的文件,你仍可以对map任务的中间输出作压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压
缩可以提高很多性能,这些工作也是只要设置两个属性即可,我们看下代码里怎么设置:
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class, CompressionCodec.class);
Job job=new Job(conf);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: