您的位置:首页 > 运维架构

hadoop自定义文件输出格式

2013-04-22 17:03 393 查看
自定义文件的输出格式相对于输入格式来说就简单一些,在自己的输出格式里面实现RecordWriter接口即可,关键是同步写方法的实现可以把key,value的输出控制的很灵活,看看例子吧:

public class ImageOutputFormat extends TextOutputFormat<Text, DistanceVector> {
protected static class Writer extends RecordWriter<Text, DistanceVector> {
private LineRecordWriter<Text, Text> w;
public Writer(DataOutputStream dos, String codec) {
w = new LineRecordWriter<Text, Text>(dos, codec);
}
public Writer(DataOutputStream dos) {
w = new LineRecordWriter<Text, Text>(dos);
}
//  这里控制自己的输出style
public synchronized void write(Text key, DistanceVector value)
throws IOException {
StringBuffer sb = new StringBuffer();

System.out.println(value.getV2());
java.util.Iterator<Double> iter=value.getV1().iterator();
while(iter.hasNext())
{
sb.append(iter.next());new Text(key.toString());
sb.append(",");
}
sb.append(" distance:"+value.getV2());

w.write(new Text(key), new Text(sb.substring(0,
sb.length() - 1) + "\n"));
}

@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
w.close(context);
}

;
}

@Override
public RecordWriter<Text, DistanceVector> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(
"mapred.textoutputformat.separator", "\t");//key value的分隔符自己可以设置
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new Writer(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new Writer(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: