您的位置:首页 > 其它

HBase(九):HBaseAndMapReduce(三)

2016-05-20 00:00 288 查看
HBaseMapReduce3:
将HDFS文件内容数据写入存储到HBase中:
对一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。

这里已经是固定指定HDFS中的某一文件,然后在reduce中把这些键值对写入到HBase中。

[code=language-java]public class HBaseAndMapReduce3 {

public static void main(String[] args) throws Exception {
System.exit(run());
}

public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.226.129");

Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HBaseAndMapReduce3.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );

FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/2016051818564427/part-r-00000"));

// 把数据写入Hbase数据库
TableMapReduceUtil.initTableReducerJob("friend",FindFriendReducer.class, job);
checkTable(conf);
return job.waitForCompletion(true) ? 0 : 1;
}

private static void checkTable(Configuration conf) throws Exception {
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();
TableName tn = TableName.valueOf("friend");
if (!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor("person");
htd.addFamily(hcd);
admin.createTable(htd);
System.out.println("表不存在,新创建表成功....");
}
}

public static class FindFriendReducer extends
TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(
Text key,
Iterable<Text> values,
Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {

Put put = new Put(key.getBytes());
put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"),
values.iterator().next().getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}

//原数据文件中的内容:

[code=plain]hadoop	Berg-OSChina,BergBerg
hbase	OSChina,BergBerg
zookeeper	OSChina,BergBerg

///将HDFS中文件内容存入HBase中,通过客户端全表扫描知:

[code=plain]hbase(main):003:0> scan 'friend'
ROW                             COLUMN+CELL
hadoop                         column=person:nickname, timestamp=1463748372584, value=Berg-OSChina,BergBerg
hbasep                         column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg
zookeeper                      column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg
3 row(s) in 0.2850 seconds
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: