自定义hadoop的InputFormat
2014-05-06 19:48
302 查看
菜鸟是hadoop新手,对hadoop不了解。这两天处理从文件中读取数据块由于数据块是为固定的1024b,为了根据这个大小根据对文件进行分片输入给mapreduce进行数据的处理。发现现有的InputFormat不能满足需求,最后只能重定义InputFormat,来实现数据的分片。创建自定义InputFormat首先要实现两个类一个是InputFormat抽象接口,另一个是RecordRead的抽象接口。另一种是继承InputFormat和RecordRead类。继承和实现这两种方式在细节上有所差异但是大致上还是一致。这两种方式都需要实现getsplit()这个函数。另外需要分别实现createRecordRead和getRecordRead这两个接口。
首先定义一个IBlockSplit实现数据分片的功能,这个实现的是InputFormat这个抽象接口。故此需要完成两个函数getsplit()和getRecordRead()
在这里getsplit()并没有实现将文件分块只是将文件名,所分的数据块的起始地址和结束地址,以及文件所在主机加入一个Filesplit的链表中。
然后map类将会调用getRecordRead这个接口对生成的自定义RecordRead这个类进行处理:获取对应的key和value值,下面简单描述一下在读处理key-value的流程(可能有误,这是本人通过实验测,理解的):
首先map调用getRecordRead()这个接口获取RecordRead类型的数据。在获取这个类型的数据后会调用createKey()和createValue()这两个函数创建与map对应的key和value值然后调用next()含有数据传递给map如果为true表示这个RecordRead还有数据如果为false表示还有数据传输完成。
最后下面是运行代码:
对于继承inputFormat来实现自定的来除了配置地方不同外有点不同的地方已在上面大号字体标注
运行结束结果:
首先定义一个IBlockSplit实现数据分片的功能,这个实现的是InputFormat这个抽象接口。故此需要完成两个函数getsplit()和getRecordRead()
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; public class IBlockSplit implements InputFormat<LongWritable,Text>{ private static int mySplitSize=1024; private final float SPLIT_SLOP =(float) 1.1; private static final PathFilter hiddenFilter = new PathFilter(){ @Override public boolean accept(Path p) { // TODO Auto-generated method stub String name =p.getName(); return !name.startsWith("_")&&!name.startsWith("."); } }; private static class MultiPathFilter implements PathFilter{ private List<PathFilter> filters; public MultiPathFilter(List<PathFilter>filters){ this.filters=filters; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub for(PathFilter filter:filters){ if(!filter.accept(path)){ return false; } } return true; } } @Override public RecordReader<LongWritable,Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { // TODO Auto-generated method stub return new BlockReadRecord(split,job); } @Override public InputSplit[] getSplits(JobConf job, int numsplit) throws IOException { System.out.println("come into getSplits"); // TODO Auto-generated method stub FileStatus[] files = listStatus(job); for(FileStatus file:files) { if(file.isDir()){ System.out.println("is Dir()"); } } int i = 0; ArrayList<FileSplit> splits = new ArrayList<FileSplit>(); NetworkTopology cluseterMap = new NetworkTopology(); for(FileStatus file:files){ Path path=file.getPath(); FileSystem fs = path.getFileSystem(job); long length = file.getLen(); BlockLocation[] blkLocation=fs.getFileBlockLocations(file, 0, length); if(length!=0){ long blockSize=file.getBlockSize(); System.out.println("blockSize="+blockSize); long splitSize = mySplitSize; long bytesRemaining = length; while(((double)bytesRemaining)/mySplitSize>SPLIT_SLOP){ int blkIndex = getBlockIndex(blkLocation, length-bytesRemaining); System.out.println("blkIndex="+blkIndex); splits.add(new FileSplit(path,length-bytesRemaining,splitSize, blkLocation[blkIndex].getHosts())); bytesRemaining-=mySplitSize; System.out.println("bytesRemaining="+bytesRemaining); //i++; } if(bytesRemaining !=0){ splits.add(new FileSplit(path,length-bytesRemaining, bytesRemaining,blkLocation[blkLocation.length-1].getHosts())); //i++; } }else if(length !=0){ //i++; splits.add(new FileSplit(path,0,length,blkLocation[0].getHosts())); }else{ //i++; splits.add(new FileSplit(path,0,length,new String[0])); } System.out.print("length="+length+"block="+length/1024); } //System.out.println(i); return splits.toArray(new FileSplit[splits.size()]); } private int getBlockIndex(BlockLocation[] blkLocation, long offset) { // TODO Auto-generated method stub for(int i = 0;i < blkLocation.length;i++){ if((blkLocation[i].getOffset()<=offset)&& (offset<blkLocation[i].getOffset()+ blkLocation[i].getLength())){ return i; } } BlockLocation last = blkLocation[blkLocation.length-1]; long fileLength = last.getOffset()+last.getLength()-1; throw new IllegalArgumentException("Offset" + offset+"is outside file (0..."+ fileLength+")"); } private FileStatus[] listStatus(JobConf job) throws IOException { // TODO Auto-generated method stub Path[] dirs=getInputPath(job); if(dirs.length==0){ throw new IOException("No Input FIle"); } TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); List<FileStatus> result = new ArrayList<FileStatus>(); List<IOException> errors = new ArrayList<IOException>(); List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFilter); PathFilter jobFilter = getInputPathFilter(job); if(jobFilter!=null){ filters.add(jobFilter); } PathFilter inputFilter= new MultiPathFilter(filters); for(Path p:dirs){ FileSystem fs =p.getFileSystem(job); FileStatus[] matches = fs.globStatus(p,inputFilter); if(matches == null){ errors.add(new IOException("matches=null error")); }else if(matches.length==0){ errors.add(new IOException("matches.length==0 error")); }else{ for(FileStatus globStat:matches){ if(globStat.isDir()){ for(FileStatus stat:fs.listStatus(globStat.getPath(), inputFilter)){ result.add(stat); } }else{ result.add(globStat); } } } } return result.toArray(new FileStatus[result.size()]); } private PathFilter getInputPathFilter(JobConf job) { // TODO Auto-generated method stub Configuration conf =job; Class<?> filterClass =conf.getClass("mapred.input" + ".pathFilter.class",null,PathFilter.class); return (filterClass!=null)? (PathFilter)ReflectionUtils.newInstance(filterClass,conf):null; } private Path[] getInputPath(JobConf job) { // TODO Auto-generated method stub String dirs = job.get("mapred.input.dir",""); String[] list = StringUtils.split(dirs); Path[] result = new Path[list.length]; for(int i = 0;i<list.length;i++){ result[i]=new Path(StringUtils.unEscapeString(list[i])); } return result; } }
在这里getsplit()并没有实现将文件分块只是将文件名,所分的数据块的起始地址和结束地址,以及文件所在主机加入一个Filesplit的链表中。
然后map类将会调用getRecordRead这个接口对生成的自定义RecordRead这个类进行处理:获取对应的key和value值,下面简单描述一下在读处理key-value的流程(可能有误,这是本人通过实验测,理解的):
首先map调用getRecordRead()这个接口获取RecordRead类型的数据。在获取这个类型的数据后会调用createKey()和createValue()这两个函数创建与map对应的key和value值然后调用next()含有数据传递给map如果为true表示这个RecordRead还有数据如果为false表示还有数据传输完成。
import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; public class BlockReadRecord implements RecordReader<LongWritable,Text> { private static final Log LOG = LogFactory.getLog(MyRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs= null; private long start; private long pos; private long end; private byte[] buffer; private String keyName; private FSDataInputStream fileIn; private LongWritable key =null; private Text value = null; long i; BlockReadRecord(InputSplit Insplit, JobConf job) throws IOException{ FileSplit split = (FileSplit)Insplit; start=split.getStart(); end = split.getLength()+start; final Path path = split.getPath(); System.out.println(path.toString()); keyName = path.toString(); final FileSystem fs = path.getFileSystem(job); fileIn=fs.open(path); fileIn.seek(start); System.out.println("len="+split.getLength()+ "start="+start+"end="+end); buffer = new byte[(int)(end-start)]; this.pos=start; } @Override public void close() throws IOException { // TODO Auto-generated method stub if(fileIn!=null){ fileIn.close(); } } @Override public LongWritable createKey() { // TODO Auto-generated method stub System.out.println("create key"+key); if(key==null){ key = new LongWritable(); key .set(start/1024); //return key; } return key; //return new LongWritable().set(key);; } @Override public Text createValue() { // TODO Auto-generated method stub if(value==null){ value=new Text(); try { fileIn.seek(pos); fileIn.readFully(pos,buffer); value.set(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return value; //return value; } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return pos; } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub if(start==end){ return 0.0f; }else{ return Math.min(1.0f, (pos-start)/(float)(end-start)); } } @Override public boolean next(LongWritable arg0, Text arg1) throws IOException { System.out.println("next key"+key); if(key==null) { key = new LongWritable(); } if(value==null) { value = new Text(); } while(pos<end){ System.out.print("key name="); System.out.println(keyName); //key.set((this.start)/1024+1); //i=key.get(); //System.out.println("key int ="+i); //value.clear(); //fileIn.seek(pos); //fileIn.readFully(pos,buffer); //fileIn.readFully(pos, buffer, 0, (int)(end-pos)); //value.set(buffer); pos+=buffer.length; LOG.info("end is:"+end+"pos is"+pos); return true; } return false; } }
最后下面是运行代码:
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { System.out.println("in map key="+key.get()); IntWritable i = new IntWritable(); i.set(1); word.set(key.toString()); output.collect(word, i); } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); // Job job = new Job(conf,"MyCount"); conf.setJobName("wordCount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(IBlockSplit.class); // conf.setInputFormat(TextInputFormat.class); //job.setInputFormatClass(BlockInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); // conf.setOutputKeyClass(LongWritable.class); // conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); // job.wait(); JobClient.runJob(conf); } }
对于继承inputFormat来实现自定的来除了配置地方不同外有点不同的地方已在上面大号字体标注
运行结束结果:
相关文章推荐
- hadoop编程小技巧(5)---自定义输入文件格式类InputFormat
- Hadoop自定义InputFormat
- [转]自定义hadoop map/reduce输入文件切割InputFormat
- hadoop mapreduce 自定义InputFormat
- Hadoop:自定义输入文件格式类InputFormat
- hadoop自定义InputFormat
- Hadoop自定义InputFormat
- 利用hadoop命令rcc生成Record 一种简单的方式实现自定义的writable对象
- Hadoop系列-MapReduce自定义数据类型(序列化、反序列化机制)(十二)
- hadoop 2.6 mapreduce 自定义log 的查看方法
- Hadoop + MapReduce 端口自定义配置
- Hadoop -实现自定义的Key类型
- Hadoop学习笔记—5.自定义类型处理手机上网日志
- Hadoop自定义实现Writable/WritableComparable接口的类方法及应用
- hadoop如何自定义InputFormats和OutputFormats
- Hadoop编程模型组件--InputFormat
- Hadoop 自定义序列化编程
- 【Hadoop】Hadoop MR 自定义排序
- Hadoop之Hive自定义函数的陷阱
- hadoop 自定义分组排序,求相同key中value最小值