您的位置:首页 > 其它

各种方式将数据导入到HBase体验

2013-03-20 10:44 302 查看
目前随着HBase的上线,需要将各种零散的数据源陆续导入到HBase中。

根据自己的使用下来的感受,总结导入方式有以下几种:

第一:命令行导入

第二:编写MR程序导入

第三:采用javaAPI的方式导入

第四:使用Sqoop方式导入

第一种很简单。不适合从已存在的数据中导入,就是兼容性不好。

第二种细分为三类:总体来说,数据都是放在HDFS上面。

第一类使用TableOutputFormat的方式,网上有很多例子,这里不多介绍,这里主要是说下,使用这种方式每次都是写一个记录到HBase方式里,我们使用下来发现非常慢。

后来为了能够进行批量写入。所以就有了下面的想法,就是修改代码,改成批量的方式。

第二类修改TableOutputFormat为Batch(需要自己修改代码)

第一:将TableOutputFormat的内部类TableRecordWriter里的方法进行小小的修改。如下:

public void write(KEY key, Writable value) throws IOException {
if (value instanceof List)
this.table.put((List<Put>) value);
else if (value instanceof Put)
this.table.put(new Put((Put) value));
else if (value instanceof Delete)
this.table.delete(new Delete((Delete) value));
else
throw new IOException("Pass a Delete or a Put");
}


在第二、三行,增加了一个List<Put>的处理,利用到批量处理的方式。

这时需要再增加一个List。如下:

static class ArrayListW<E> extends ArrayList<E> implements Writable {

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub

}

}

其中两个方法write和readFields可以不实现,因为不会用到。

下面给出部分map代码如下:

public void map(LongWritable key, Text line, Context context)
throws IOException {

String data = line.toString().trim();
if (StringUtils.isBlank(data)) {
return;
}
String[] datas = data.split("\001");
if (columns.length == datas.length
&& (datas[1].equals("TRADE_FINISHED")
|| datas[1].equals("TRADE_CLOSED") || datas[1]
.equals("TRADE_CLOSED_BY_TAOBAO"))) {
byte[] row = Bytes.toBytes(datas[0]);
String modified = datas[3];
Put put = new Put(row);
put.setWriteToWAL(false);
long timestamp;
try {
timestamp = format.parse(modified).getTime();
for (int i = 0; i < datas.length; i++) {
if (StringUtils.isNotBlank(datas[i])
&& !"NULL".equals(datas[i])
&& !"null".equals(datas[i])) {
put.add(family, Bytes.toBytes(columns[i]),
timestamp, Bytes.toBytes(datas[i]));
}
}
results.add(put);
if (results.size() >= 2500) {
context.write(new ImmutableBytesWritable(), results);
results.clear();
}
} catch (ParseException e1) {
e1.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
其中StringUtils类来自org.apache.commons.lang包。

这里results是我们之前ArrayListW的对象,将每个put都放入到此对象。当results的集合大小达到2500的时候,就批量写一次。参考过源码context.write(null,results)也是可以的,第一个参数基本没什么用,我这里用new了个ImmutableBytesWritable对象也没什么特殊的意义。

这里还要注意最后还是要去实现下cleanup方法,不然会漏掉数据,代码如下:

public void cleanup(Context context) throws IOException,
InterruptedException {
if (results.size() > 0) {
context.write(new ImmutableBytesWritable(), results);
results.clear();
}
}


这样就可以了,运行的效果是时间较长,但是数据能够正常导入到HBase,比较适合较大的数据导入,一次性的任务。

第三类就是网上很多方式先生成HFile,然后在load进去。

不过在使用此类时,在事先做好了regin区域后,在运行的过程种会在MapOutputCopier.shuffleInMemory allocationOutOfMemory问题(3个reginserver,总共9TB硬盘)。

如果事先不做regin区域,并且hbase表是一个新建的表,那么就只有一个map。而且在HFile的执行过程中,还是有reduce的执行过程还是会进行排序,参考源码中的部分,

使用的是TreeMap这种简单方式进行排序。另外就是事先创建多个regin,使之可以批量多道导入。在实际的是过程中,对已有的数据做分布,是一个很麻烦的过程,而且经过多次尝试,还是有分布不均匀的情况(文件很大或者文件很小),后来采用中位数的方式,第一步生成HFile可以成功,但是在第二步load的时候,还是不能正常执行(会出现allocationOutOfMemory的异常现象,不知道是否与我设置的一个regin的大小有关,目前暂时没有找到原因)。总而言之,就是采用这种方式跟当前的业务场景、数据规模有关,并不保证一定能够成功。

第三种Hbase Client API方式

这种方式就是利用HBase的Client端的相关API方式,进行处理,跟MR的方式差不多,区别就是一个可以自动进行并行处理,一个是单进程的方式。

第四种sqoop的方式,比较适合数据源来自关系型的数据库导入到HBase里,但是有个问题就是目前的Sqoop不支持自定义的时间戳,这样在有些业务场景中就不太适合了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: