您的位置:首页 > 运维架构

hadoop datajoin例子

2016-02-23 00:00 811 查看
摘要: hadoop in action中的hadoop datajoin例子

java代码:

/**
*
*/
package com.jason.hadoop.example;

/**
* @author jason
*
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configured implements Tool {

public static class MapClass extends DataJoinMapperBase {

@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];

return new Text(groupKey);
}

@Override
protected Text generateInputTag(String inputFile) {
String datasource = inputFile.split("-")[0];

return new Text(datasource);
}

@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}

}

public static class Reduce extends DataJoinReducerBase {

@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) {
return null;
}
if (tags.length == 1
&& ((Text) tags[0]).toString().endsWith("user")) {
return null;
}

String retStr = "";
if (tags.length == 1
&& ((Text) tags[0]).toString().endsWith("order")) {
retStr = "NULL,";
}
for (int i = 0; i < values.length; i++) {
if (i > 0) {
retStr += ",";
}
TaggedWritable tw = (TaggedWritable) values[i];
// 结果是: 3,王五,137xxxxxxxx
String line = ((Text) tw.getData()).toString();
// 结果是: {3,"王五,137xxxxxxxx"}
String[] tokens = line.split(",", 2);
retStr += tokens[1];
}
TaggedWritable ret = new TaggedWritable(new Text(retStr));
ret.setTag((Text) tags[0]);

return ret;
}
}

public static class TaggedWritable extends TaggedMapOutput {
private Text data;

public TaggedWritable() {
// 必须有,否则反序列化时出错
this.data = new Text("");
}

public TaggedWritable(Text data) {
this.data = data;
}

@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
}

@Override
public Writable getData() {
return data;
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoin.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(TaggedWritable.class);

job.setOutputValueClass(Text.class);

job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoin(), args);
System.exit(res);
}

}

数据准备:

user.txt

1,张三,135xxxxxxxx
2,李四,136xxxxxxxx
3,王五,137xxxxxxxx
4,赵六,138xxxxxxxx

order.txt

3,A,13,2013-02-12
1,B,23,2013-02-14
2,C,16,2013-02-17
3,D,25,2013-03-12

上传数据:

hadoop fs -put *.txt //example/datajoin/in

最后使用命令行运行:

HADOOP_CLASSPATH=/home/jason/hadoop-1.0.1/contrib/datajoin/hadoop-datajoin-1.0.1.jar hadoop jar hadoopExample-1.0-SNAPSHOT.jar com.jason.hadoop.example.DataJoin -libjars /home/jason/hadoop-1.0.1/contrib/datajoin/hadoop-datajoin-1.0.1.jar /example/datajoin/in /example/datajoin/out1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop datajoin