各种方式将数据导入到HBase体验
2013-03-20 10:44
302 查看
目前随着HBase的上线,需要将各种零散的数据源陆续导入到HBase中。
根据自己的使用下来的感受,总结导入方式有以下几种:
第一:命令行导入
第二:编写MR程序导入
第三:采用javaAPI的方式导入
第四:使用Sqoop方式导入
第一种很简单。不适合从已存在的数据中导入,就是兼容性不好。
第二种细分为三类:总体来说,数据都是放在HDFS上面。
第一类使用TableOutputFormat的方式,网上有很多例子,这里不多介绍,这里主要是说下,使用这种方式每次都是写一个记录到HBase方式里,我们使用下来发现非常慢。
后来为了能够进行批量写入。所以就有了下面的想法,就是修改代码,改成批量的方式。
第二类修改TableOutputFormat为Batch(需要自己修改代码)
第一:将TableOutputFormat的内部类TableRecordWriter里的方法进行小小的修改。如下:
在第二、三行,增加了一个List<Put>的处理,利用到批量处理的方式。
这时需要再增加一个List。如下:
其中两个方法write和readFields可以不实现,因为不会用到。
下面给出部分map代码如下:
这里results是我们之前ArrayListW的对象,将每个put都放入到此对象。当results的集合大小达到2500的时候,就批量写一次。参考过源码context.write(null,results)也是可以的,第一个参数基本没什么用,我这里用new了个ImmutableBytesWritable对象也没什么特殊的意义。
这里还要注意最后还是要去实现下cleanup方法,不然会漏掉数据,代码如下:
这样就可以了,运行的效果是时间较长,但是数据能够正常导入到HBase,比较适合较大的数据导入,一次性的任务。
第三类就是网上很多方式先生成HFile,然后在load进去。
不过在使用此类时,在事先做好了regin区域后,在运行的过程种会在MapOutputCopier.shuffleInMemory allocationOutOfMemory问题(3个reginserver,总共9TB硬盘)。
如果事先不做regin区域,并且hbase表是一个新建的表,那么就只有一个map。而且在HFile的执行过程中,还是有reduce的执行过程还是会进行排序,参考源码中的部分,
使用的是TreeMap这种简单方式进行排序。另外就是事先创建多个regin,使之可以批量多道导入。在实际的是过程中,对已有的数据做分布,是一个很麻烦的过程,而且经过多次尝试,还是有分布不均匀的情况(文件很大或者文件很小),后来采用中位数的方式,第一步生成HFile可以成功,但是在第二步load的时候,还是不能正常执行(会出现allocationOutOfMemory的异常现象,不知道是否与我设置的一个regin的大小有关,目前暂时没有找到原因)。总而言之,就是采用这种方式跟当前的业务场景、数据规模有关,并不保证一定能够成功。
第三种Hbase Client API方式
这种方式就是利用HBase的Client端的相关API方式,进行处理,跟MR的方式差不多,区别就是一个可以自动进行并行处理,一个是单进程的方式。
第四种sqoop的方式,比较适合数据源来自关系型的数据库导入到HBase里,但是有个问题就是目前的Sqoop不支持自定义的时间戳,这样在有些业务场景中就不太适合了。
根据自己的使用下来的感受,总结导入方式有以下几种:
第一:命令行导入
第二:编写MR程序导入
第三:采用javaAPI的方式导入
第四:使用Sqoop方式导入
第一种很简单。不适合从已存在的数据中导入,就是兼容性不好。
第二种细分为三类:总体来说,数据都是放在HDFS上面。
第一类使用TableOutputFormat的方式,网上有很多例子,这里不多介绍,这里主要是说下,使用这种方式每次都是写一个记录到HBase方式里,我们使用下来发现非常慢。
后来为了能够进行批量写入。所以就有了下面的想法,就是修改代码,改成批量的方式。
第二类修改TableOutputFormat为Batch(需要自己修改代码)
第一:将TableOutputFormat的内部类TableRecordWriter里的方法进行小小的修改。如下:
public void write(KEY key, Writable value) throws IOException { if (value instanceof List) this.table.put((List<Put>) value); else if (value instanceof Put) this.table.put(new Put((Put) value)); else if (value instanceof Delete) this.table.delete(new Delete((Delete) value)); else throw new IOException("Pass a Delete or a Put"); }
在第二、三行,增加了一个List<Put>的处理,利用到批量处理的方式。
这时需要再增加一个List。如下:
static class ArrayListW<E> extends ArrayList<E> implements Writable { @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub } }
其中两个方法write和readFields可以不实现,因为不会用到。
下面给出部分map代码如下:
public void map(LongWritable key, Text line, Context context) throws IOException { String data = line.toString().trim(); if (StringUtils.isBlank(data)) { return; } String[] datas = data.split("\001"); if (columns.length == datas.length && (datas[1].equals("TRADE_FINISHED") || datas[1].equals("TRADE_CLOSED") || datas[1] .equals("TRADE_CLOSED_BY_TAOBAO"))) { byte[] row = Bytes.toBytes(datas[0]); String modified = datas[3]; Put put = new Put(row); put.setWriteToWAL(false); long timestamp; try { timestamp = format.parse(modified).getTime(); for (int i = 0; i < datas.length; i++) { if (StringUtils.isNotBlank(datas[i]) && !"NULL".equals(datas[i]) && !"null".equals(datas[i])) { put.add(family, Bytes.toBytes(columns[i]), timestamp, Bytes.toBytes(datas[i])); } } results.add(put); if (results.size() >= 2500) { context.write(new ImmutableBytesWritable(), results); results.clear(); } } catch (ParseException e1) { e1.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }其中StringUtils类来自org.apache.commons.lang包。
这里results是我们之前ArrayListW的对象,将每个put都放入到此对象。当results的集合大小达到2500的时候,就批量写一次。参考过源码context.write(null,results)也是可以的,第一个参数基本没什么用,我这里用new了个ImmutableBytesWritable对象也没什么特殊的意义。
这里还要注意最后还是要去实现下cleanup方法,不然会漏掉数据,代码如下:
public void cleanup(Context context) throws IOException, InterruptedException { if (results.size() > 0) { context.write(new ImmutableBytesWritable(), results); results.clear(); } }
这样就可以了,运行的效果是时间较长,但是数据能够正常导入到HBase,比较适合较大的数据导入,一次性的任务。
第三类就是网上很多方式先生成HFile,然后在load进去。
不过在使用此类时,在事先做好了regin区域后,在运行的过程种会在MapOutputCopier.shuffleInMemory allocationOutOfMemory问题(3个reginserver,总共9TB硬盘)。
如果事先不做regin区域,并且hbase表是一个新建的表,那么就只有一个map。而且在HFile的执行过程中,还是有reduce的执行过程还是会进行排序,参考源码中的部分,
使用的是TreeMap这种简单方式进行排序。另外就是事先创建多个regin,使之可以批量多道导入。在实际的是过程中,对已有的数据做分布,是一个很麻烦的过程,而且经过多次尝试,还是有分布不均匀的情况(文件很大或者文件很小),后来采用中位数的方式,第一步生成HFile可以成功,但是在第二步load的时候,还是不能正常执行(会出现allocationOutOfMemory的异常现象,不知道是否与我设置的一个regin的大小有关,目前暂时没有找到原因)。总而言之,就是采用这种方式跟当前的业务场景、数据规模有关,并不保证一定能够成功。
第三种Hbase Client API方式
这种方式就是利用HBase的Client端的相关API方式,进行处理,跟MR的方式差不多,区别就是一个可以自动进行并行处理,一个是单进程的方式。
第四种sqoop的方式,比较适合数据源来自关系型的数据库导入到HBase里,但是有个问题就是目前的Sqoop不支持自定义的时间戳,这样在有些业务场景中就不太适合了。
相关文章推荐
- 转载-SQL Server各种导入导出数据方式的比较
- HBase数据导入方式
- HBase的几种导入数据的方式
- hive over hbase方式将文本库数据导入hbase
- 数据导入HBase最常用的三种方式及实践分析
- HBase总结(十九)数据导入方式
- HBase总结之数据导入方式
- hive over hbase方式将微博用户数据导入hbase
- HBase数据导入之completebulkload方式
- 数据导入HBase最常用的三种方式及实践分析
- HBase导入大数据三大方式之(二)——importtsv +completebulkload 方式
- HBase总结(十九)数据导入方式
- HBase实战(1):数据导入方式
- HBase的数据导入方式
- 数据导入HBase最常用的三种方式及实践分析
- HBase 数据导入功能实现方式解释
- HBase 实战(1)--HBase的数据导入方式
- 数据导入HBase最常用的三种方式及实践分析
- 数据导入HBase最常用的三种方式及实践分析
- HBase导入大数据三大方式