hadoop 操作数据库。DBInputFormat,DBOutputFormat
2012-03-18 13:16
253 查看
hadoop 操作数据库。DBInputFormat,DBOutputFormat
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
public class DBAccess {
public static class DBAccessMap extends MapReduceBase implements
Mapper<LongWritable, ItemRecord, LongWritable, Text> {
public void map(LongWritable key, ItemRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
StringBuilder s = new StringBuilder();
for (int i = 0; i < value.item.length; i++) {
if (value.item[i].equals("T"))
if(s.toString().equals(""))
s = s.append(String.valueOf(i + 1));
else
s=s.append(":"+String.valueOf(i+1));
}
collector.collect(key, new Text(s.toString()));
}
}
public static class DBAccessReduce extends MapReduceBase implements
Reducer<LongWritable, Text, OrderRecord, Text> {
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<OrderRecord, Text> collector, Reporter reporter)
throws IOException {
OrderRecord or = new OrderRecord();
or.id = (int)key.get()+1;
or.item = value.next().toString();
collector.collect(or, new Text(or.item));
}
}
public static void main(String[] args) throws Exception {
JobClient jobclient = new JobClient();
JobConf conf = new JobConf(DBAccess.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(OrderRecord.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/test", "root", "a");
String[] fields1 = { "id", "item1", "item2", "item3", "item4", "item5",
"item6", "item7", "item8", "item9", "item10", "item11" };
DBInputFormat.setInput(conf, ItemRecord.class, "item", null, "id",
fields1);
// String[] fields2={"id","item"};
DBOutputFormat.setOutput(conf, "newitem", "id", "item");
conf.setMapperClass(DBAccessMap.class);
conf.setReducerClass(DBAccessReduce.class);
jobclient.setConf(conf);
JobClient.runJob(conf);
}
}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
public class DBAccess {
public static class DBAccessMap extends MapReduceBase implements
Mapper<LongWritable, ItemRecord, LongWritable, Text> {
public void map(LongWritable key, ItemRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
StringBuilder s = new StringBuilder();
for (int i = 0; i < value.item.length; i++) {
if (value.item[i].equals("T"))
if(s.toString().equals(""))
s = s.append(String.valueOf(i + 1));
else
s=s.append(":"+String.valueOf(i+1));
}
collector.collect(key, new Text(s.toString()));
}
}
public static class DBAccessReduce extends MapReduceBase implements
Reducer<LongWritable, Text, OrderRecord, Text> {
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<OrderRecord, Text> collector, Reporter reporter)
throws IOException {
OrderRecord or = new OrderRecord();
or.id = (int)key.get()+1;
or.item = value.next().toString();
collector.collect(or, new Text(or.item));
}
}
public static void main(String[] args) throws Exception {
JobClient jobclient = new JobClient();
JobConf conf = new JobConf(DBAccess.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(OrderRecord.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/test", "root", "a");
String[] fields1 = { "id", "item1", "item2", "item3", "item4", "item5",
"item6", "item7", "item8", "item9", "item10", "item11" };
DBInputFormat.setInput(conf, ItemRecord.class, "item", null, "id",
fields1);
// String[] fields2={"id","item"};
DBOutputFormat.setOutput(conf, "newitem", "id", "item");
conf.setMapperClass(DBAccessMap.class);
conf.setReducerClass(DBAccessReduce.class);
jobclient.setConf(conf);
JobClient.runJob(conf);
}
}
相关文章推荐
- Hadoop和数据库之间剪不清理还乱的关系(DBOutputFormat)
- 数据库操作语句的简单写法,用format方法
- Hadoop&MapReduce操作关系数据库(MySQL)
- hadoop连接mysql数据库执行数据读写数据库操作
- Hadoop中DBInputFormat和DBOutputFormat使用
- Hadoop中DBInputFormat和DBOutputFormat使用
- hadoop MapReduce 的DBOutputFormat 使用体会
- hadoop连接mysql数据库执行数据读写数据库操作
- 通过eclipse方法来操作Hadoop集群上cassandra数据库(包括创建Keyspace对象以及往数据库写入数据)
- Hadoop中的DBInputFormat
- 一步一步跟我学习hadoop(7)----hadoop连接mysql数据库执行数据读写数据库操作
- 一步一步跟我学习hadoop(7)----hadoop连接mysql数据库运行数据读写数据库操作
- Hadoop下hive数据库基础操作命令代码合集
- Hadoop DBOutputFormat的使用
- hadoop超时解决办法context.progress()的作用 假设在map()方法中,你有一个从数据库读取大量数据的操作,是用一个循环来完成的,并且,在读完全部的数据之前,你不会有任何的数据输出
- DBInputFormat+DBOutputFormat
- hadoop数据库操作解析及注意事项
- Hadoop Hbase数据库操作
- Hadoop DBOutputFormat的使用
- Hadoop DBOutputFormat的使用