hadoop的InputFormat简单demo
2016-10-09 21:08
501 查看
1.序列化对象
2.InputFormat类
3.mapreduce程序
package com.lijie.inutformat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class ScorePair implements WritableComparable<ScorePair>{ private float a; private float b; private float c; private float d; private float e; public float getA() { return a; } public void setA(float a) { this.a = a; } public float getB() { return b; } public void setB(float b) { this.b = b; } public float getC() { return c; } public void setC(float c) { this.c = c; } public float getD() { return d; } public void setD(float d) { this.d = d; } public float getE() { return e; } public void setE(float e) { this.e = e; } public ScorePair() { super(); // TODO Auto-generated constructor stub } public ScorePair(float a, float b, float c, float d, float e) { super(); this.a = a; this.b = b; this.c = c; this.d = d; this.e = e; } public void set(float a, float b, float c, float d, float e) { this.a = a; this.b = b; this.c = c; this.d = d; this.e = e; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + Float.floatToIntBits(a); result = prime * result + Float.floatToIntBits(b); result = prime * result + Float.floatToIntBits(c); result = prime * result + Float.floatToIntBits(d); result = prime * result + Float.floatToIntBits(e); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; ScorePair other = (ScorePair) obj; if (Float.floatToIntBits(a) != Float.floatToIntBits(other.a)) return false; if (Float.floatToIntBits(b) != Float.floatToIntBits(other.b)) return false; if (Float.floatToIntBits(c) != Float.floatToIntBits(other.c)) return false; if (Float.floatToIntBits(d) != Float.floatToIntBits(other.d)) return false; if (Float.floatToIntBits(e) != Float.floatToIntBits(other.e)) return false; return true; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub a=in.readFloat(); b=in.readFloat(); c=in.readFloat(); d=in.readFloat(); e=in.readFloat(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeFloat(a); out.writeFloat(b); out.writeFloat(c); out.writeFloat(d); out.writeFloat(e); } @Override public int compareTo(ScorePair o) { // TODO Auto-generated method stub return 0; } }
2.InputFormat类
package com.lijie.inutformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class ScoreInputFormat extends FileInputFormat<Text, ScorePair>{ @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } @Override public RecordReader<Text, ScorePair> createRecordReader( InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new ScoreRecordReader(); } } class ScoreRecordReader extends RecordReader<Text, ScorePair>{ private LineReader in; private Text lineKey; private ScorePair lineValue; private Text line; @Override public void close() throws IOException { // TODO Auto-generated method stub if(in !=null){ in.close(); } } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineKey; } @Override public ScorePair getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { FileSplit split = (FileSplit)arg0; Configuration conf = arg1.getConfiguration(); Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream fileIn = fs.open(path); in = new LineReader(fileIn,conf); line = new Text(); lineKey = new Text(); lineValue = new ScorePair(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int lineSize = in.readLine(line); if(lineSize == 0) return false; String[] split = line.toString().split("\\s+"); if(split.length != 7){ throw new IOException("数据错误!"); } float a,b,c,d,e; a = Float.parseFloat(split[2].trim()); b = Float.parseFloat(split[3].trim()); c = Float.parseFloat(split[4].trim()); d = Float.parseFloat(split[5].trim()); e = Float.parseFloat(split[6].trim()); lineKey.set(split[0]+"\t"+split[1]); lineValue.set(a, b, c, d, e); return true; } }
3.mapreduce程序
package com.lijie.inutformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreMapReduce extends Configured implements Tool { public static void main(String[] args) throws Exception { String[] path = {"hdfs://lijie:9000/score/*","hdfs://lijie:9000/score/out"}; int run = ToolRunner.run(new Configuration(), new ScoreMapReduce(), path); System.exit(run); } public static class ScoreMap extends Mapper<Text, ScorePair, Text, ScorePair> { @Override protected void map(Text key, ScorePair value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class ScoreReduce extends Reducer<Text, ScorePair, Text, Text> { @Override protected void reduce( Text key, Iterable<ScorePair> values, Context context) throws IOException, InterruptedException { ScorePair value = values.iterator().next(); //sum float sum = value.getA()+value.getB()+value.getC()+value.getD()+value.getE(); //avg float avg = sum/5; context.write(key, new Text("sum:"+sum+"\t"+"avg:"+avg)); } } @Override public int run(String[] arg) throws Exception { Configuration conf = new Configuration(); Path path = new Path(arg[1]); FileSystem fs = path.getFileSystem(conf); if(fs.isDirectory(path)){ fs.delete(path, true); } Job job = new Job(conf, "score"); job.setJarByClass(ScoreMapReduce.class); job.setMapperClass(ScoreMap.class); job.setReducerClass(ScoreReduce.class); job.setInputFormatClass(ScoreInputFormat.class); FileInputFormat.addInputPath(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job, path); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScorePair.class); job.waitForCompletion(true); return 0; } }
相关文章推荐
- Hadoop开发常用的InputFormat和OutputFormat
- Hadoop Map/Reduce InputFormat基础
- 自定义hadoop map/reduce输入文件切割InputFormat
- [Hadoop源码详解]之一MapReduce篇之InputFormat
- Hadoop-采样器-多输入路径-只采一个文件-(MultipleInputs+getsample(conf.getInputFormat)
- Hadoop开发常用的InputFormat和OutputFormat
- Hadoop之InputFormat接口的设计与实现
- 自定义hadoop map/reduce输入文件切割InputFormat
- hadoop CombineFileInputFormat
- KeyValueTextInputFormat 分割key value For hadoop 1.0
- Hadoop Map/Reduce 新API中自己的FileInputFormat写法
- Hadoop开发常用的InputFormat和OutputFormat
- hadoop自定义SdfTextInputFormat用在streaming中
- 自定义hadoop map/reduce输入文件切割InputFormat
- Hadoop 自定义InputFormat实现自定义Split
- 自定义hadoop map/reduce输入文件切割InputFormat 更改输入value的分隔符
- 在Hadoop的streaming中使用自定义的inputformat和outputformat
- Hadoop-采样器-多输入路径-只采一个文件-(MultipleInputs+getsample(conf.getInputFormat)
- hadoop inputformat
- Hadoop使用CombineFileInputFormat处理大量小文件接口实现(Hadoop-1.0.4)