您的位置:首页 > 其它

MR中简单实现自定义的输入输出格式

2014-09-08 20:47 239 查看
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestCombine extends Configured implements Tool {
private static class ProvinceMapper extends
Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("value : " + value + " Context " + context);
context.write(value, value);
}
}

private static class ProvinceReducer extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text va : values) {
System.out.println("reduce " + key);
context.write(key, key);
}
}
}

// 输入格式
static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);
}
}

static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {
private CombineFileSplit split;
private TaskAttemptContext context;
private int index;
private RecordReader<K, V> rr;

@SuppressWarnings("unchecked")
public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
this.index = index;
this.split = (CombineFileSplit) split;
this.context = context;

this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());
}

@SuppressWarnings("unchecked")
@Override
public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {
this.split = (CombineFileSplit) curSplit;
this.context = curContext;

if (null == rr) {
rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
}

FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index), this.split.getLength(index),
this.split.getLocations());

this.rr.initialize(fileSplit, this.context);
}

@Override
public float getProgress() throws IOException, InterruptedException {
return rr.getProgress();
}

@Override
public void close() throws IOException {
if (null != rr) {
rr.close();
rr = null;
}
}

@Override
public K getCurrentKey()
throws IOException, InterruptedException {
return rr.getCurrentKey();
}

@Override
public V getCurrentValue()
throws IOException, InterruptedException {
return rr.getCurrentValue();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return rr.nextKeyValue();
}
}

// 输出格式
static class MyOutputFormat extends FileOutputFormat<Text, Text>{
@Override
public RecordWriter<Text, Text> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
return new MyRecordWriter(job);
}
}

public static class  MyRecordWriter extends RecordWriter<Text, Text> {
private Map<String, FSDataOutputStream> outputMap = null;
private static final String LINESEPARATOR = "\n";
private FileSystem fs;
private JobContext job;

public MyRecordWriter(JobContext job) throws IOException {
this.outputMap = new HashMap<String, FSDataOutputStream>();
this.job = job;
this.fs = FileSystem.get(job.getConfiguration());
}

// 参考 MultipleOutputs
public void write(Text key, Text value) throws IOException {
String k = key.toString();
if(k.isEmpty())
return;
FSDataOutputStream out = outputMap.get(k);
if(out==null) {
if(k.isEmpty())
System.out.println(value.toString());
Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k);
if(!fs.exists(outputPath))
out = fs.create(outputPath);
else
return;
outputMap.put(k, out);
}
out.write(value.getBytes());
out.write(LINESEPARATOR.getBytes());
}

@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
for(FSDataOutputStream out : outputMap.values()) {
out.close();
}
}
}

public int run(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJobName("TestCombine");
job.setJarByClass(TestCombine.class);

job.setMapperClass(ProvinceMapper.class);
job.setReducerClass(ProvinceReducer.class);

//job.setInputFormatClass(CombineSequenceFileInputFormat.class);
job.setOutputFormatClass(MyOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

String inpath = "/home/hadoop/tmp/combine";
String outpath = "/home/hadoop/tmp/combineout";
Path p = new Path(outpath);

FileSystem fs = FileSystem.get(conf);
if (fs.exists(p)){
fs.delete(p);
}
FileInputFormat.addInputPaths(job, inpath);
FileOutputFormat.setOutputPath(job, p);

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new TestCombine(), args);
System.exit(ret);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: