您的位置:首页 > 运维架构

Hadoop RecordReader

2016-05-12 21:27 281 查看
自定义RecordReader

步骤:

1)继承抽象类RecordReader,实现RecordReader的一个实例

2)实现自定义InputFormat类,重写InputFormat中的createRecordReader()方法,返回值是自定义的RecordReader实例

3)配置job.setInputFormatClass()设置自定义的InputFormat实例

RecordReader例子

应用场景:

数据:

10

20

30

40

50

60

70

要求:分别计算奇数行与偶数行数据之和

奇数行总和:10+30+50+70=160

偶数行总和:20+40+60=120

MyRecordReader.java

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader extends RecordReader<LongWritable, Text> {
private long start ;
private long end ;
private long pos ;//表示行号
private FSDataInputStream fin = null ;
private LongWritable key = null ;
private Text value = null ;
private LineReader reader = null ;
@Override
public void close() throws IOException {
fin.close() ;
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit ;
start = fileSplit.getStart() ;
end = start + fileSplit.getLength() ;

Configuration conf = context.getConfiguration() ;
Path path = fileSplit.getPath() ;
FileSystem fs = path.getFileSystem(conf) ;
fin = fs.open(path) ;
fin.seek(start) ;
reader = new LineReader(fin) ;
pos = 1 ;

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(key == null){
key = new LongWritable() ;
}
key.set(pos) ;

if(value == null){
value = new Text() ;
}

if(reader.readLine(value) ==0){
return false ;
}

pos++ ;
return true ;
}

}


MyFileInputFormat.java

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
return new MyRecordReader();
}

@Override
protected boolean isSplitable(JobContext context, Path filename) {

return false ;
}

}


MyMapper.java

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
context.write(key, value) ;
}

}


MyPartitioner.java

package com.recordreader;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<LongWritable, Text> {

@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
if(key.get() % 2 == 0){
key.set(1) ;
return 1 ;
}else{
key.set(0) ;
return 0 ;
}

}

}


MyReducer.java

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<LongWritable, Text, Text, IntWritable> {

@Override
protected void reduce(LongWritable key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int sum = 0 ;
for(Text val : value){
sum += Integer.valueOf(val.toString()) ;
}
Text writeKey = new Text() ;
IntWritable writeValue = new IntWritable() ;

if(key.get() == 0){
writeKey.set("奇数行之和:") ;
}else{
writeKey.set("偶数行之和:") ;
}

writeValue.set(sum) ;
context.write(writeKey, writeValue) ;
}

}


TestRecordReader.java

package com.recordreader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestRecordReader {
public static void main(String args[]) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(TestRecordReader.class);
job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);
job.setPartitionerClass(MyPartitioner.class) ;
job.setNumReduceTasks(2) ;
job.setInputFormatClass(MyFileInputFormat.class) ;

//      job.setOutputKeyClass(Text.class);
//      job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: