hbase批量导入之bulkloader使用实战
2015-11-19 17:04
501 查看
HBase大量导入数据时,使用client方式将会导致大量网络IO以及损耗集群计算资源,Hbase官方的bulkloader可以很好解决这个场景。
bulkloader支持将写成HFile格式的数据直接放入HBase,这个过程分为:
1.准备HFile格式的文件于HDFS中:
本样例代码场景为读取另一个HBase表中的数据,写为HFile格式。
其中写入方式可以通过MR任务完成,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。
2.将HFile格式的文件置于新的HBase中:
提供两种方式提交bulkloader任务。
本地代码远程提交任务(注意HBase表一定要手动先建好):
服务器上通过命令提交任务:
bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=40 <hdfs://storefileoutput> <tablename>
建议通过后者方式,该方式速度较前者节约网络IO,且速度快过前者方法。
ps:
HFile所在的hdfs与被导入表所在的hbase最好基于一套hdfs,这样bulkloader速度极快过程也短;
如果不预先建好被导入表并按rowkey分好region,导入过后所有数据都会在一个region中,必须手动触发split,非常消耗时间。而如果在导入数据前分好table的region,程序则会在导入前对HFile进行分组导入,效率较split&compaction快很多。
参考文献 http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html
http://blog.csdn.net/opensure/article/details/47054861 http://uestzengting.iteye.com/blog/1290557
bulkloader支持将写成HFile格式的数据直接放入HBase,这个过程分为:
1.准备HFile格式的文件于HDFS中:
本样例代码场景为读取另一个HBase表中的数据,写为HFile格式。
其中写入方式可以通过MR任务完成,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。
@Override public MapreduceResult runMapreduce(Properties properties) { try { properties.setProperty("mapreduce.hdfs.output", "/user/yingzz/hbase/output2"); properties.setProperty(ConstantsDef.HBASE_SOURCE_ZK_QUORUM, "127.0.0.1:2181"); properties.setProperty(ConstantsDef.HBASE_SOURCE_NAMESERVER_ADDR, "default"); properties.setProperty(ConstantsDef.HBASE_SOURCE_TABLE, "tablename"); Scan scan = new Scan(); scan.setMaxVersions(1); Job job = this.createHbaseInputJob(scan, HFileOutputFormat2.class, NullWritable.class, Text.class, NullWritable.class, Text.class, XxxMapper.class, null, null, properties); boolean success = job.waitForCompletion(true); if(success){ return new MapreduceResult(MapreduceResult.EJOB_RUN_SUCCESSED); } } catch (Exception e) { e.printStackTrace(); } return new MapreduceResult(MapreduceResult.EJOB_RUN_FAILED); } }
public class XxxMapper extends TableMapper<ImmutableBytesWritable, KeyValue>{ NullWritable nullWritable = NullWritable.get(); @Override protected void setup(Context context){ } public void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException{ ImmutableBytesWritable rowkey = new ImmutableBytesWritable( value.toString().split(",")[0].getBytes()); KeyValue[] kvs = value.raw(); for(KeyValue kv : kvs){ context.write(rowkey, kv); } }
public Job createHbaseInputJob( Scan scan, Class<? extends OutputFormat> outputFormatClazz, Class<?> mapperOutKeyClazz, Class<?> mapperOutValueClazz, Class<?> outPutKeyClazz, Class<?> outPutValueClazz, Class<? extends TableMapper> mapperClazz, Class<? extends Reducer> reducerClazz, Class<? extends Reducer> combinerClazz, Properties properties) throws Exception{ String inputTable = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE); String rowIndex = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE_START_ROW); if(rowIndex != null && !rowIndex.isEmpty()){ scan.setStartRow(Bytes.toBytes(rowIndex)); } rowIndex = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE_STOP_ROW); if(rowIndex != null && !rowIndex.isEmpty()){ scan.setStopRow(Bytes.toBytes(rowIndex)); } final String outPutPath = properties.getProperty(ConstantsDef.MR_HDFS_OUTPUT); HDFSUtils.rmdir(outPutPath); final String inputPathText = properties.getProperty(ConstantsDef.MR_HDFS_INPUT,""); String[] inputArray = inputPathText.split(","); List<String> inputPaths = new LinkedList<String>(); for(String input : inputArray){ inputPaths.add(input); } Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", properties.getProperty(ConstantsDef.HBASE_SOURCE_ZK_QUORUM)); conf.set("hbase.nameserver.address",properties.getProperty(ConstantsDef.HBASE_SOURCE_NAMESERVER_ADDR)); //将特定前缀的配置读取设置到conf中 for(Object keyObj : properties.keySet()){ if(keyObj instanceof String){ String key = (String)keyObj; if(key.startsWith(ConstantsDef.MR_JOB_CUSTOM_CONF_PREFIX)){ String val = properties.getProperty(key); if(val != null){ conf.set(key, val); } } } } boolean sequenceFileOut = (SequenceFileOutputFormat.class == outputFormatClazz); String resource = properties.getProperty(ConstantsDef.MR_JOB_RESOURCE); if(resource != null && !resource.isEmpty()){ conf.addResource(resource); } String taskTimeOutMaxVal = properties.getProperty("mapreduce.task.timeout"); if(taskTimeOutMaxVal != null){ conf.setLong("mapreduce.task.timeout", Long.parseLong(taskTimeOutMaxVal)); } String dfsSocketTimeOutMaxVal = properties.getProperty("dfs.socket.timeout"); if(dfsSocketTimeOutMaxVal != null){ conf.setLong("dfs.socket.timeout", Long.parseLong(dfsSocketTimeOutMaxVal)); } String dfsSocketWriteTimeOutMaxVal = properties.getProperty("dfs.datanode.socket.write.timeout"); if(dfsSocketWriteTimeOutMaxVal != null){ conf.setLong("dfs.datanode.socket.write.timeout", Long.parseLong(dfsSocketWriteTimeOutMaxVal)); } String numMapTasks = properties.getProperty("mapreduce.tasktracker.map.tasks.maximum"); if(numMapTasks != null){ //map的个数限制 conf.setInt("mapreduce.tasktracker.map.tasks.maximum", Integer.parseInt(numMapTasks)); } //shuffle配置 String shuffleBufferSize = properties.getProperty("mapreduce.job.shuffle.merge.percent"); if(shuffleBufferSize != null){ conf.setFloat("mapreduce.job.shuffle.merge.percent", Float.parseFloat(shuffleBufferSize)); } Job job = Job.getInstance(conf, this.getClass().getName()); job.setJarByClass(this.getClass()); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapperClazz, mapperOutKeyClazz, mapperOutValueClazz, job); if(outPutPath != null){ Path output = new Path(outPutPath); if(sequenceFileOut){ SequenceFileOutputFormat.setOutputPath(job, output); }else{ FileOutputFormat.setOutputPath(job,output ); } } if(combinerClazz != null){ job.setCombinerClass(combinerClazz); } if(reducerClazz != null){ job.setReducerClass(reducerClazz); } if(outputFormatClazz != null){ job.setOutputFormatClass(outputFormatClazz); } job.setOutputKeyClass(outPutKeyClazz); job.setOutputValueClass(outPutValueClazz); String numReduceTasks = properties.getProperty(ConstantsDef.MR_JOB_REDUCER_NUM); if(numReduceTasks != null){ job.setNumReduceTasks(Integer.parseInt(numReduceTasks)); } //没有reducer的时候直接设置为0个 if(reducerClazz == null){ job.setNumReduceTasks(0); } //map压缩配置 String mapCompress = properties.getProperty("mapr 4000 educe.map.output.compression.codec"); if(mapCompress != null && !mapCompress.isEmpty()){ Class<?> codec = Class.forName(mapCompress); if(codec != null){ conf.setBoolean("mapreduce.compress.map.output", true); conf.setClass("mapreduce.map.output.compression.codec", codec, CompressionCodec.class); } } String reduceCompress = properties.getProperty("mapreduce.reduce.output.compression.codec"); if(reduceCompress != null && !reduceCompress.isEmpty()){ Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>)Class.forName(reduceCompress); if(sequenceFileOut){ SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, codec); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); }else{ FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, codec); } } //jobPriority String jobPriority = properties.getProperty("mapreduce.job.priority"); if(jobPriority != null){ if("VERY_HIGH".equals(jobPriority)){ job.setPriority(JobPriority.VERY_HIGH); }else if("HIGH".equals(jobPriority)){ job.setPriority(JobPriority.HIGH); }else if("LOW".equals(jobPriority)){ job.setPriority(JobPriority.LOW); }else if("VERY_LOW".equals(jobPriority)){ job.setPriority(JobPriority.VERY_LOW); }else{ job.setPriority(JobPriority.NORMAL); } } return job; }
2.将HFile格式的文件置于新的HBase中:
提供两种方式提交bulkloader任务。
本地代码远程提交任务(注意HBase表一定要手动先建好):
public static void main(String[] args) throws Throwable { Configuration conf = HBaseConfiguration.create(); conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily",5000); //HFile过多时一定要设置该项 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); //第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表 // loader.doBulkLoad(new Path("hdfs://127.0.0.1:9000/output1/"), new HTable(conf, "t3")); loader.doBulkLoad(new Path("hdfs://127.0.0.1:8020/user/yingzz/hbase/output"), new HTable(conf, "tablename")); //最后调用System.exit进行退出 }
服务器上通过命令提交任务:
bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=40 <hdfs://storefileoutput> <tablename>
建议通过后者方式,该方式速度较前者节约网络IO,且速度快过前者方法。
ps:
HFile所在的hdfs与被导入表所在的hbase最好基于一套hdfs,这样bulkloader速度极快过程也短;
如果不预先建好被导入表并按rowkey分好region,导入过后所有数据都会在一个region中,必须手动触发split,非常消耗时间。而如果在导入数据前分好table的region,程序则会在导入前对HFile进行分组导入,效率较split&compaction快很多。
参考文献 http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html
http://blog.csdn.net/opensure/article/details/47054861 http://uestzengting.iteye.com/blog/1290557
相关文章推荐
- 我是运营,我没有假期
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- 修复mysql数据库
- 浅析SQL数据操作语句
- SQLServer 数据导入导出的几种方法小结
- MySQL数据备份之mysqldump的使用详解
- C#实现窗体间传递数据实例
- 给你的数据库文件减肥
- Oracle数据更改后出错的解决方法
- Oracle数据库数据丢失恢复的几种方法总结
- C#将Sql数据保存到Excel文件中的方法
- MFC实现在文件尾追加数据的方法
- 把excel表格里的数据导入sql数据库的两种方法
- 用文本作数据处理
- 桌面中心(一)创建数据库
- 桌面中心(四)数据显示