您的位置:首页 > 运维架构

mapreduce文件读取与清洗

2015-12-24 00:00 246 查看
package com.demo.admin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Test extends Configured implements Tool {
//构建map类
public static class TestMap extends Mapper<LongWritable, Text, Text, TestWritable>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
//根据$开始切割字段名
final String[] splited = value.toString().split(“\\$”);
//以name为key从0开始
final String name = splited[0];
final Text k2 = new Text(name);
//phone shiqu就是第四位和第八位
final TestWritable v2=new TestWritable(splited[4], splited[8]);
//name为key值 phone和shiqu为v2写入
context.write(k2, v2);
}
}
//构建reduce类
public static class TestReduce extends Reducer<Text, TestWritable, Text, TestWritable>{
public void reduce(Text k2,Iterable<TestWritable> v2s,Context context) throws IOException, InterruptedException{
String phone;
String shiqu;
//循环所有的key值和values值
for(TestWritable testWritable:v2s){
phone=testWritable.phone;
shiqu=testWritable.shiqu;
TestWritable v3=new TestWritable(phone, shiqu);
context.write(k2, v3);
}
}
}
//main方法启动
public static void main(String [] args) throws IOException, Exception{
ToolRunner.run(new Test(), args);
}
@SuppressWarnings(“deprecation”)
public int run(String[] args) throws Exception {
Configuration conf=new Configuration();
String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs();
if(argArray.length!=2){
System.out.println(“请提供两个参数”);
System.exit(1);
}
Job job=Job.getInstance(conf, “Test”);
FileSystem fs = FileSystem.get(new URI(args[1]), conf);
fs.delete(new Path(args[1]));
job.setJarByClass(Test.class);
job.setMapperClass(TestMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TestWritable.class);
job.setReducerClass(TestReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TestWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
static class TestWritable implements Writable{
String phone;
String shiqu;
public TestWritable(String phone,String shiqu){
this.phone=phone;
this.shiqu=shiqu;
}
//无参构造方法public class UserBean implements Writable
//这个应该是在自定义writable的时候需要注意,反射过程中需要调用无参构造。
public TestWritable(){}
public void readFields(DataInput in) throws IOException {
this.phone=in.readUTF();
this.shiqu=in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeUTF(shiqu);
}
public String toString() {
return phone + “\t” + shiqu + “\t”;
}
}
}

示例文件:张三$25$男$未婚$15997444444$409930360$中国$湖北$广水
输入文件:张三 15997444444广水

shell 命令:
/usr/local/hadoop/bin/hadoop fs -put /home/XX/test.txt /test_log/
/usr/local/hadoop/bin/hadoop jar /home/XX/test.jar /test_log/test.txt /test_cleaned/ 1>/dev/nul
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop