您的位置:首页 > 大数据 > Hadoop

大数据-Hadoop生态(20)-MapReduce框架原理-OutputFormat

2018-12-12 19:45 1011 查看

1.outputFormat接口实现类

2.自定义outputFormat

步骤:

1). 定义一个类继承FileOutputFormat

2). 定义一个类继承RecordWrite,重写write方法

 

3. 案例

有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.nty.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

自定义类继承FileOutputFormat

package com.nty.outputFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* author nty
* date time 2018-12-12 19:28
*/
public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> {

@Override
public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FilterRecordWrite frw = new FilterRecordWrite();
frw.init(job);
return frw;
}
}

自定义RecordWriter,重写write

package com.nty.outputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* author nty
* date time 2018-12-12 19:29
*/
public class FilterRecordWrite extends RecordWriter<LongWritable, Text> {

private FSDataOutputStream nty;

private FSDataOutputStream other;

//将job通过参数传递过来
public void init(TaskAttemptContext job) throws IOException {

String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);

FileSystem fileSystem = FileSystem.get(job.getConfiguration());

nty = fileSystem.create(new Path(outDir + "/nty.log"));
other = fileSystem.create(new Path(outDir + "/other.log"));

}

@Override
public void write(LongWritable key, Text value) throws IOException, InterruptedException {
String address = value.toString() + "\r\n";

if(address.contains("nty")) {
nty.write(address.getBytes());
} else {
other.write(address.getBytes());
}

}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(nty);
IOUtils.closeStream(other);
}
}

Driver类设置

package com.nty.outputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* author nty
* date time 2018-12-12 19:29
*/
public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

job.setJarByClass(FilterDriver.class);

job.setOutputFormatClass(FilterOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

输出结果

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: