您的位置:首页 > 数据库

hadoop 操作数据库。DBInputFormat,DBOutputFormat

2012-03-18 13:16 253 查看
hadoop 操作数据库。DBInputFormat,DBOutputFormat

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

public class DBAccess {
public static class DBAccessMap extends MapReduceBase implements
Mapper<LongWritable, ItemRecord, LongWritable, Text> {
public void map(LongWritable key, ItemRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
StringBuilder s = new StringBuilder();
for (int i = 0; i < value.item.length; i++) {
if (value.item[i].equals("T"))
if(s.toString().equals(""))
s = s.append(String.valueOf(i + 1));
else
s=s.append(":"+String.valueOf(i+1));
}
collector.collect(key, new Text(s.toString()));
}
}

public static class DBAccessReduce extends MapReduceBase implements
Reducer<LongWritable, Text, OrderRecord, Text> {
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<OrderRecord, Text> collector, Reporter reporter)
throws IOException {
OrderRecord or = new OrderRecord();
or.id = (int)key.get()+1;
or.item = value.next().toString();
collector.collect(or, new Text(or.item));
}

}

public static void main(String[] args) throws Exception {
JobClient jobclient = new JobClient();
JobConf conf = new JobConf(DBAccess.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(OrderRecord.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/test", "root", "a");
String[] fields1 = { "id", "item1", "item2", "item3", "item4", "item5",
"item6", "item7", "item8", "item9", "item10", "item11" };
DBInputFormat.setInput(conf, ItemRecord.class, "item", null, "id",
fields1);
// String[] fields2={"id","item"};
DBOutputFormat.setOutput(conf, "newitem", "id", "item");
conf.setMapperClass(DBAccessMap.class);
conf.setReducerClass(DBAccessReduce.class);
jobclient.setConf(conf);
JobClient.runJob(conf);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息