mapreduce程序的压缩实现
2015-04-22 18:20
183 查看
案例是按照权威指南第二版的MaxTemperatureWithCompression进行的。
输入:
当压缩文件做为mapreduce的输入时,mapreduce将自动通过扩展名找到相应的codec对其解压。
输出:
当mapreduce的输出文件需要压缩时,
1.可以更改mapred.output.compress为true,mapped.output.compression.codec为想要使用的codec的类名就可以了,
2.可以在代码中指定,通过调用FileOutputFormat的静态方法去设置这两个属性,我们来看代码:
如果你要将序列文件做为输出,你需要设置mapred.output.compression.type属性来指定压缩类型,默认是RECORD类型,它会按单个的record压缩,如果指定为BLOCK类型,它将一组record压缩,压缩效果自然是BLOCK好。
当然代码里也可以设置,你只需调用SequenceFileOutputFormat的setOutputCompressionType方法进行设置。
即使你的mapreduce的输入输出都是未压缩的文件,你仍可以对map任务的中间输出作压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压
缩可以提高很多性能,这些工作也是只要设置两个属性即可,我们看下代码里怎么设置:
输入:
当压缩文件做为mapreduce的输入时,mapreduce将自动通过扩展名找到相应的codec对其解压。
输出:
当mapreduce的输出文件需要压缩时,
1.可以更改mapred.output.compress为true,mapped.output.compression.codec为想要使用的codec的类名就可以了,
2.可以在代码中指定,通过调用FileOutputFormat的静态方法去设置这两个属性,我们来看代码:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MaxTemperatureWithCompression { public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); // System.out.println(year); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("Usage: MaxTemperatureWithCompression <input path> <output path>"); System.exit(-1); } Job job = new Job(conf); job.setJarByClass(MaxTemperatureWithCompression.class); job.setJobName("MaxTemperatureWithCompression"); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); // 设置输出的压缩格式 FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
</pre><div>或者按照第一种方式进行设置:</div><div><pre name="code" class="java">Configuration conf = new Configuration(); conf.setBoolean("mapred.out.compress", true); conf.setClass("mapred.output.compression.codec",GzipCodec.class, CompressionCodec.class); Job job=new Job(conf);同时删除对FileOutputFormat的压缩设置。
如果你要将序列文件做为输出,你需要设置mapred.output.compression.type属性来指定压缩类型,默认是RECORD类型,它会按单个的record压缩,如果指定为BLOCK类型,它将一组record压缩,压缩效果自然是BLOCK好。
当然代码里也可以设置,你只需调用SequenceFileOutputFormat的setOutputCompressionType方法进行设置。
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);对map任务输出进行压缩:
即使你的mapreduce的输入输出都是未压缩的文件,你仍可以对map任务的中间输出作压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压
缩可以提高很多性能,这些工作也是只要设置两个属性即可,我们看下代码里怎么设置:
Configuration conf = new Configuration(); conf.setBoolean("mapred.compress.map.output", true); conf.setClass("mapred.map.output.compression.codec",GzipCodec.class, CompressionCodec.class); Job job=new Job(conf);
相关文章推荐
- 使用Python实现Hadoop MapReduce程序
- java 程序实现对图片的压缩生成缩略图并可设定长宽、尺寸压缩率、图片质量
- 用Java实现ZIP压缩文件和目录程序代码
- 嵌入式目标板程序的压缩(3)--实现目标板解压升级程序功能
- 一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
- 使用Python实现Hadoop MapReduce程序
- 使用Python实现Hadoop MapReduce程序
- C#调用外部程序实现压缩和解压缩[原]
- 使用Python实现Hadoop MapReduce程序
- mapreduce程序实现排序
- 用程序实现压缩access(*.mdb)数据库的方法.
- 使用Python实现Hadoop MapReduce程序
- Asp.net程序优化js、css实现合并与压缩的方法
- java 程序实现对图片的压缩生成缩略图并可设定长宽、尺寸压缩率、图片质量
- Python实现mapreduce程序
- 华为(C++实现字符串压缩程序)
- C++实现的Huffman压缩解压缩程序及相应程序框架的设计
- 矩阵乘法的mapreduce程序实现
- 压缩或解压程序实现