Hadoop之sequencefile与text类型转化
2015-10-15 17:50
483 查看
sequencefile格式转text格式
这里仅针对输入格式为<\Text, IntWritable>的键值对sequencefile文件,可根据实际需要修改,最终输出文本格式。package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SequencefileToText { public static class ReaderMapper extends Mapper<Text, IntWritable, Text, Text> { protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException { context.write(key, new Text(value + "")); } } /*public static class WriteReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException { while (values.hasNext()) { context.write(key, values.next()); } } }*/ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage : SequencefileToText "); System.exit(2); } Job job = new Job(conf, "SequencefileToText"); job.setJarByClass(SequencefileToText.class); job.setInputFormatClass(SequenceFileInputFormat.class); // section2 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); // section3 job.setMapperClass(ReaderMapper.class); //job.setCombinerClass(WriteReducer.class); //job.setReducerClass(WriteReducer.class); // section4 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // section5 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
text文件转sequencefile
这里输入文本格式,转化输出格式为<\Text, IntWritable>的sequencefile文件,但本质上都可以使用hadoop fs -text命令查看,只是遇到mahout中的格式要求时可替换。package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.WordCountnew.IntSumReducer; import org.apache.hadoop.examples.WordCountnew.TokenizerMapper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TextToSequencefile { public static class ReaderMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class WriterReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); protected void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; while (values.hasNext()) { sum += ((IntWritable) values.next()).get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // section 1 Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage : TextToSequencefile "); System.exit(2); } Job job = new Job(conf, "TextToSequencefile"); job.setJarByClass(TextToSequencefile.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.NONE); // section2 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // section3 job.setMapperClass(ReaderMapper.class); job.setCombinerClass(WriterReducer.class); job.setReducerClass(WriterReducer.class); // section4 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // section5 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- java结合HADOOP集群文件上传下载
- 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
- Hadoop安装感悟
- hadoop安装lzo