您的位置:首页 > 运维架构

hadoop的InputFormat简单demo

2016-10-09 21:08 501 查看
1.序列化对象

package com.lijie.inutformat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ScorePair implements WritableComparable<ScorePair>{

private float a;
private float b;
private float c;
private float d;
private float e;

public float getA() {
return a;
}

public void setA(float a) {
this.a = a;
}

public float getB() {
return b;
}

public void setB(float b) {
this.b = b;
}

public float getC() {
return c;
}

public void setC(float c) {
this.c = c;
}

public float getD() {
return d;
}

public void setD(float d) {
this.d = d;
}

public float getE() {
return e;
}

public void setE(float e) {
this.e = e;
}

public ScorePair() {
super();
// TODO Auto-generated constructor stub
}

public ScorePair(float a, float b, float c, float d, float e) {
super();
this.a = a;
this.b = b;
this.c = c;
this.d = d;
this.e = e;
}
public void set(float a, float b, float c, float d, float e) {
this.a = a;
this.b = b;
this.c = c;
this.d = d;
this.e = e;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Float.floatToIntBits(a);
result = prime * result + Float.floatToIntBits(b);
result = prime * result + Float.floatToIntBits(c);
result = prime * result + Float.floatToIntBits(d);
result = prime * result + Float.floatToIntBits(e);
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ScorePair other = (ScorePair) obj;
if (Float.floatToIntBits(a) != Float.floatToIntBits(other.a))
return false;
if (Float.floatToIntBits(b) != Float.floatToIntBits(other.b))
return false;
if (Float.floatToIntBits(c) != Float.floatToIntBits(other.c))
return false;
if (Float.floatToIntBits(d) != Float.floatToIntBits(other.d))
return false;
if (Float.floatToIntBits(e) != Float.floatToIntBits(other.e))
return false;
return true;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
a=in.readFloat();
b=in.readFloat();
c=in.readFloat();
d=in.readFloat();
e=in.readFloat();
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeFloat(a);
out.writeFloat(b);
out.writeFloat(c);
out.writeFloat(d);
out.writeFloat(e);
}

@Override
public int compareTo(ScorePair o) {
// TODO Auto-generated method stub
return 0;
}

}


2.InputFormat类

package com.lijie.inutformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class ScoreInputFormat extends FileInputFormat<Text, ScorePair>{

@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}

@Override
public RecordReader<Text, ScorePair> createRecordReader(    InputSplit arg0,
TaskAttemptContext arg1)    throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new ScoreRecordReader();
}

}

class ScoreRecordReader extends RecordReader<Text, ScorePair>{

private LineReader in;
private Text lineKey;
private ScorePair lineValue;
private Text line;

@Override
public void close() throws IOException {
// TODO Auto-generated method stub
if(in !=null){
in.close();
}
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineKey;
}

@Override
public ScorePair getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)    throws IOException,
InterruptedException {
FileSplit split = (FileSplit)arg0;
Configuration conf = arg1.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);

FSDataInputStream fileIn = fs.open(path);
in = new LineReader(fileIn,conf);
line = new Text();
lineKey = new Text();
lineValue = new ScorePair();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

int lineSize = in.readLine(line);
if(lineSize == 0) return false;
String[] split = line.toString().split("\\s+");
if(split.length != 7){
throw new IOException("数据错误!");
}
float a,b,c,d,e;
a = Float.parseFloat(split[2].trim());
b = Float.parseFloat(split[3].trim());
c = Float.parseFloat(split[4].trim());
d = Float.parseFloat(split[5].trim());
e = Float.parseFloat(split[6].trim());
lineKey.set(split[0]+"\t"+split[1]);
lineValue.set(a, b, c, d, e);
return true;
}

}


3.mapreduce程序

package com.lijie.inutformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ScoreMapReduce extends Configured implements Tool {

public static void main(String[] args) throws Exception {
String[] path = {"hdfs://lijie:9000/score/*","hdfs://lijie:9000/score/out"};
int run = ToolRunner.run(new Configuration(), new ScoreMapReduce(), path);
System.exit(run);
}

public static class ScoreMap extends Mapper<Text, ScorePair, Text, ScorePair> {
@Override
protected void map(Text key, ScorePair value, Context context)  throws IOException,
InterruptedException {
context.write(key, value);
}
}

public static class ScoreReduce extends Reducer<Text, ScorePair, Text, Text> {
@Override
protected void reduce(  Text key, Iterable<ScorePair> values,
Context context) throws IOException, InterruptedException {
ScorePair value = values.iterator().next();
//sum
float sum = value.getA()+value.getB()+value.getC()+value.getD()+value.getE();
//avg
float avg = sum/5;
context.write(key, new Text("sum:"+sum+"\t"+"avg:"+avg));
}
}

@Override
public int run(String[] arg) throws Exception {
Configuration conf = new Configuration();
Path path = new Path(arg[1]);
FileSystem fs = path.getFileSystem(conf);
if(fs.isDirectory(path)){
fs.delete(path, true);
}

Job job = new Job(conf, "score");

job.setJarByClass(ScoreMapReduce.class);

job.setMapperClass(ScoreMap.class);
job.setReducerClass(ScoreReduce.class);

job.setInputFormatClass(ScoreInputFormat.class);

FileInputFormat.addInputPath(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job, path);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ScorePair.class);

job.waitForCompletion(true);
return 0;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: