您的位置:首页 > 产品设计 > UI/UE

Hadoop之sequencefile与text类型转化

2015-10-15 17:50 483 查看

sequencefile格式转text格式

这里仅针对输入格式为<\Text, IntWritable>的键值对sequencefile文件,可根据实际需要修改,最终输出文本格式。

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SequencefileToText {

public static class ReaderMapper extends Mapper<Text, IntWritable, Text, Text> {

protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
context.write(key, new Text(value + ""));
}
}

/*public static class WriteReducer extends Reducer<Text, Text, Text, Text> {

protected void reduce(Text key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
while (values.hasNext()) {
context.write(key, values.next());
}
}
}*/

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage : SequencefileToText ");
System.exit(2);
}
Job job = new Job(conf, "SequencefileToText");
job.setJarByClass(SequencefileToText.class);

job.setInputFormatClass(SequenceFileInputFormat.class);

// section2
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
// section3
job.setMapperClass(ReaderMapper.class);
//job.setCombinerClass(WriteReducer.class);
//job.setReducerClass(WriteReducer.class);

// section4
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

// section5
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


text文件转sequencefile

这里输入文本格式,转化输出格式为<\Text, IntWritable>的sequencefile文件,但本质上都可以使用hadoop fs -text命令查看,只是遇到mahout中的格式要求时可替换。

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCountnew.IntSumReducer;
import org.apache.hadoop.examples.WordCountnew.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TextToSequencefile {

public static class ReaderMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}

public static class WriterReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

protected void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable) values.next()).get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// section 1
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage : TextToSequencefile ");
System.exit(2);
}
Job job = new Job(conf, "TextToSequencefile");
job.setJarByClass(TextToSequencefile.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.NONE);
// section2
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// section3
job.setMapperClass(ReaderMapper.class);
job.setCombinerClass(WriterReducer.class);
job.setReducerClass(WriterReducer.class);

// section4
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

// section5
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop