大数据-Hadoop生态(20)-MapReduce框架原理-OutputFormat
2018-12-12 19:45
1011 查看
1.outputFormat接口实现类
2.自定义outputFormat
步骤:
1). 定义一个类继承FileOutputFormat
2). 定义一个类继承RecordWrite,重写write方法
3. 案例
有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log
http://www.baidu.com http://www.google.com http://cn.bing.com http://www.nty.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com
自定义类继承FileOutputFormat
package com.nty.outputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * author nty * date time 2018-12-12 19:28 */ public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> { @Override public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FilterRecordWrite frw = new FilterRecordWrite(); frw.init(job); return frw; } }
自定义RecordWriter,重写write
package com.nty.outputFormat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * author nty * date time 2018-12-12 19:29 */ public class FilterRecordWrite extends RecordWriter<LongWritable, Text> { private FSDataOutputStream nty; private FSDataOutputStream other; //将job通过参数传递过来 public void init(TaskAttemptContext job) throws IOException { String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR); FileSystem fileSystem = FileSystem.get(job.getConfiguration()); nty = fileSystem.create(new Path(outDir + "/nty.log")); other = fileSystem.create(new Path(outDir + "/other.log")); } @Override public void write(LongWritable key, Text value) throws IOException, InterruptedException { String address = value.toString() + "\r\n"; if(address.contains("nty")) { nty.write(address.getBytes()); } else { other.write(address.getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //关流 IOUtils.closeStream(nty); IOUtils.closeStream(other); } }
Driver类设置
package com.nty.outputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * author nty * date time 2018-12-12 19:29 */ public class FilterDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(FilterDriver.class); job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test")); FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
输出结果
相关文章推荐
- 大数据-Hadoop生态(16)-MapReduce框架原理-自定义FileInputFormat
- 大数据-Hadoop生态(17)-MapReduce框架原理-MapReduce流程,Shuffle机制,Partition分区
- 大数据-Hadoop生态(13)-MapReduce框架原理--Job提交源码和切片源码解析
- 大数据-Hadoop生态(14)-MapReduce框架原理-切片机制
- 大数据-Hadoop生态(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分组
- 大数据-Hadoop生态(15)-MapReduce框架原理-FileInputFormat的实现类
- Hadoop学习笔记(7)-简述MapReduce计算框架原理
- 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)
- 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)
- 大数据技术Hadoop入门理论系列之三--MapReduce框架原理简介
- 大数据-Hadoop生态(11)-MapReduce概述和简单实操
- [读书笔记]深入解析MapReduce架构设计与实现原理——CH4 Hadoop RPC基本框架
- 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)
- Hadoop Yarn 框架原理及运作机制
- Hadoop 新 MapReduce 框架 Yarn 详解
- Hadoop学习笔记:MapReduce框架详解
- Hadoop 新 MapReduce 框架 Yarn 详解
- 大数据-Hadoop生态(8)-HDFS的读写数据流程以及机架感知
- 从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)
- Hadoop 新 MapReduce 框架 Yarn 详解