Hadoop RecordReader
2016-05-12 21:27
281 查看
自定义RecordReader
步骤:
1)继承抽象类RecordReader,实现RecordReader的一个实例
2)实现自定义InputFormat类,重写InputFormat中的createRecordReader()方法,返回值是自定义的RecordReader实例
3)配置job.setInputFormatClass()设置自定义的InputFormat实例
RecordReader例子
应用场景:
数据:
10
20
30
40
50
60
70
要求:分别计算奇数行与偶数行数据之和
奇数行总和:10+30+50+70=160
偶数行总和:20+40+60=120
MyRecordReader.java
MyFileInputFormat.java
MyMapper.java
MyPartitioner.java
MyReducer.java
TestRecordReader.java
步骤:
1)继承抽象类RecordReader,实现RecordReader的一个实例
2)实现自定义InputFormat类,重写InputFormat中的createRecordReader()方法,返回值是自定义的RecordReader实例
3)配置job.setInputFormatClass()设置自定义的InputFormat实例
RecordReader例子
应用场景:
数据:
10
20
30
40
50
60
70
要求:分别计算奇数行与偶数行数据之和
奇数行总和:10+30+50+70=160
偶数行总和:20+40+60=120
MyRecordReader.java
package com.recordreader; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class MyRecordReader extends RecordReader<LongWritable, Text> { private long start ; private long end ; private long pos ;//表示行号 private FSDataInputStream fin = null ; private LongWritable key = null ; private Text value = null ; private LineReader reader = null ; @Override public void close() throws IOException { fin.close() ; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)inputSplit ; start = fileSplit.getStart() ; end = start + fileSplit.getLength() ; Configuration conf = context.getConfiguration() ; Path path = fileSplit.getPath() ; FileSystem fs = path.getFileSystem(conf) ; fin = fs.open(path) ; fin.seek(start) ; reader = new LineReader(fin) ; pos = 1 ; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(key == null){ key = new LongWritable() ; } key.set(pos) ; if(value == null){ value = new Text() ; } if(reader.readLine(value) ==0){ return false ; } pos++ ; return true ; } }
MyFileInputFormat.java
package com.recordreader; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { return new MyRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false ; } }
MyMapper.java
package com.recordreader; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(key, value) ; } }
MyPartitioner.java
package com.recordreader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner<LongWritable, Text> { @Override public int getPartition(LongWritable key, Text value, int numPartitions) { if(key.get() % 2 == 0){ key.set(1) ; return 1 ; }else{ key.set(0) ; return 0 ; } } }
MyReducer.java
package com.recordreader; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<LongWritable, Text, Text, IntWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> value,Context context) throws IOException, InterruptedException { int sum = 0 ; for(Text val : value){ sum += Integer.valueOf(val.toString()) ; } Text writeKey = new Text() ; IntWritable writeValue = new IntWritable() ; if(key.get() == 0){ writeKey.set("奇数行之和:") ; }else{ writeKey.set("偶数行之和:") ; } writeValue.set(sum) ; context.write(writeKey, writeValue) ; } }
TestRecordReader.java
package com.recordreader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.util.GenericOptionsParser; public class TestRecordReader { public static void main(String args[]) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(TestRecordReader.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setPartitionerClass(MyPartitioner.class) ; job.setNumReduceTasks(2) ; job.setInputFormatClass(MyFileInputFormat.class) ; // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- centos6.7、centos6.5下yum方式搭建lnmp环境,php5.4/mysql5.5
- ecshop ecs_template 解释 作用,数据字段 首页自定义分类显示,美乐乐
- tomcat的目录结构
- (OK) NS-3—LXC-(Docker)—MANET——HOWTO Use Linux Containers to set up virtual networks
- centos下安装jdk1.7
- 监控和管理Linux进程(新手求指教)
- 搭建Nginx+Java环境测试并且运行
- opencv中批量读取图片并保存
- 15分钟熟悉HBase Shell命令
- 平安金融PaaS技术实践:DevOps困局、PaaS及时到来、Dev做好准备
- 使用linux系统函数和c++03类写的一个简单线程池
- Linux学习之初期计划(初稿)
- 每秒处理10万订单乐视集团支付架构
- CentOS硬软链接
- 亿级用户下的新浪微博平台架构
- maven 发布项目到tomcat
- MySQL高可用方案
- 编译Linux/Android 输出很多信息很难找到错误/警告信息怎么办?
- 【CS231n winter2016 Lecture 3 (Linear classification II/loss function/optimization/SGD)】
- Linux编程常用