您的位置:首页 > 大数据 > Hadoop

hbase批量导入之bulkloader使用实战

2015-11-19 17:04 501 查看
HBase大量导入数据时,使用client方式将会导致大量网络IO以及损耗集群计算资源,Hbase官方的bulkloader可以很好解决这个场景。

bulkloader支持将写成HFile格式的数据直接放入HBase,这个过程分为:

1.准备HFile格式的文件于HDFS中:

本样例代码场景为读取另一个HBase表中的数据,写为HFile格式。
其中写入方式可以通过MR任务完成,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。

@Override
public MapreduceResult runMapreduce(Properties properties) {
try {
properties.setProperty("mapreduce.hdfs.output", "/user/yingzz/hbase/output2");
properties.setProperty(ConstantsDef.HBASE_SOURCE_ZK_QUORUM, "127.0.0.1:2181");
properties.setProperty(ConstantsDef.HBASE_SOURCE_NAMESERVER_ADDR, "default");
properties.setProperty(ConstantsDef.HBASE_SOURCE_TABLE, "tablename");
Scan scan = new Scan();
scan.setMaxVersions(1);
Job job = this.createHbaseInputJob(scan, HFileOutputFormat2.class,
NullWritable.class, Text.class,
NullWritable.class, Text.class, XxxMapper.class,
null, null, properties);
boolean success = job.waitForCompletion(true);
if(success){
return new MapreduceResult(MapreduceResult.EJOB_RUN_SUCCESSED);
}
} catch (Exception e) {
e.printStackTrace();
}
return new MapreduceResult(MapreduceResult.EJOB_RUN_FAILED);

}
}
public class XxxMapper extends TableMapper<ImmutableBytesWritable, KeyValue>{
NullWritable            nullWritable = NullWritable.get();
@Override
protected void setup(Context context){
}
public void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException{
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
value.toString().split(",")[0].getBytes());
KeyValue[] kvs = value.raw();
for(KeyValue kv : kvs){
context.write(rowkey, kv);
}
}



public Job createHbaseInputJob(
Scan scan,
Class<? extends OutputFormat> outputFormatClazz,
Class<?> mapperOutKeyClazz,
Class<?> mapperOutValueClazz,
Class<?> outPutKeyClazz,
Class<?> outPutValueClazz,
Class<? extends TableMapper> mapperClazz,
Class<? extends Reducer> reducerClazz,
Class<? extends Reducer> combinerClazz,
Properties properties) throws Exception{

String inputTable = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE);

String rowIndex = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE_START_ROW);
if(rowIndex != null && !rowIndex.isEmpty()){
scan.setStartRow(Bytes.toBytes(rowIndex));
}

rowIndex = properties.getProperty(ConstantsDef.HBASE_SOURCE_TABLE_STOP_ROW);
if(rowIndex != null && !rowIndex.isEmpty()){
scan.setStopRow(Bytes.toBytes(rowIndex));
}

final String outPutPath =  properties.getProperty(ConstantsDef.MR_HDFS_OUTPUT);
HDFSUtils.rmdir(outPutPath);

final String inputPathText = properties.getProperty(ConstantsDef.MR_HDFS_INPUT,"");
String[] inputArray = inputPathText.split(",");
List<String> inputPaths = new LinkedList<String>();
for(String input : inputArray){
inputPaths.add(input);
}

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", properties.getProperty(ConstantsDef.HBASE_SOURCE_ZK_QUORUM));
conf.set("hbase.nameserver.address",properties.getProperty(ConstantsDef.HBASE_SOURCE_NAMESERVER_ADDR));
//将特定前缀的配置读取设置到conf中
for(Object keyObj : properties.keySet()){
if(keyObj instanceof String){
String key = (String)keyObj;
if(key.startsWith(ConstantsDef.MR_JOB_CUSTOM_CONF_PREFIX)){
String val = properties.getProperty(key);
if(val != null){
conf.set(key, val);
}
}
}
}
boolean sequenceFileOut = (SequenceFileOutputFormat.class == outputFormatClazz);

String resource = properties.getProperty(ConstantsDef.MR_JOB_RESOURCE);
if(resource != null && !resource.isEmpty()){
conf.addResource(resource);
}

String taskTimeOutMaxVal = properties.getProperty("mapreduce.task.timeout");
if(taskTimeOutMaxVal != null){
conf.setLong("mapreduce.task.timeout", Long.parseLong(taskTimeOutMaxVal));
}

String dfsSocketTimeOutMaxVal = properties.getProperty("dfs.socket.timeout");
if(dfsSocketTimeOutMaxVal != null){
conf.setLong("dfs.socket.timeout", Long.parseLong(dfsSocketTimeOutMaxVal));

}

String dfsSocketWriteTimeOutMaxVal = properties.getProperty("dfs.datanode.socket.write.timeout");
if(dfsSocketWriteTimeOutMaxVal != null){
conf.setLong("dfs.datanode.socket.write.timeout", Long.parseLong(dfsSocketWriteTimeOutMaxVal));
}

String numMapTasks = properties.getProperty("mapreduce.tasktracker.map.tasks.maximum");
if(numMapTasks != null){
//map的个数限制
conf.setInt("mapreduce.tasktracker.map.tasks.maximum", Integer.parseInt(numMapTasks));
}

//shuffle配置
String shuffleBufferSize = properties.getProperty("mapreduce.job.shuffle.merge.percent");
if(shuffleBufferSize != null){
conf.setFloat("mapreduce.job.shuffle.merge.percent", Float.parseFloat(shuffleBufferSize));
}

Job job = Job.getInstance(conf, this.getClass().getName());

job.setJarByClass(this.getClass());

TableMapReduceUtil.initTableMapperJob(inputTable,
scan,
mapperClazz,
mapperOutKeyClazz,
mapperOutValueClazz,
job);

if(outPutPath != null){
Path output = new Path(outPutPath);
if(sequenceFileOut){
SequenceFileOutputFormat.setOutputPath(job, output);
}else{
FileOutputFormat.setOutputPath(job,output );
}
}

if(combinerClazz != null){
job.setCombinerClass(combinerClazz);
}

if(reducerClazz != null){
job.setReducerClass(reducerClazz);
}

if(outputFormatClazz != null){
job.setOutputFormatClass(outputFormatClazz);
}

job.setOutputKeyClass(outPutKeyClazz);
job.setOutputValueClass(outPutValueClazz);

String numReduceTasks = properties.getProperty(ConstantsDef.MR_JOB_REDUCER_NUM);
if(numReduceTasks != null){
job.setNumReduceTasks(Integer.parseInt(numReduceTasks));
}

//没有reducer的时候直接设置为0个
if(reducerClazz == null){
job.setNumReduceTasks(0);
}

//map压缩配置
String mapCompress = properties.getProperty("mapr
4000
educe.map.output.compression.codec");
if(mapCompress != null && !mapCompress.isEmpty()){
Class<?> codec = Class.forName(mapCompress);
if(codec != null){
conf.setBoolean("mapreduce.compress.map.output", true);
conf.setClass("mapreduce.map.output.compression.codec", codec, CompressionCodec.class);
}
}

String reduceCompress = properties.getProperty("mapreduce.reduce.output.compression.codec");
if(reduceCompress != null && !reduceCompress.isEmpty()){
Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>)Class.forName(reduceCompress);
if(sequenceFileOut){
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, codec);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
}else{
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, codec);
}
}

//jobPriority
String jobPriority = properties.getProperty("mapreduce.job.priority");
if(jobPriority != null){
if("VERY_HIGH".equals(jobPriority)){
job.setPriority(JobPriority.VERY_HIGH);
}else if("HIGH".equals(jobPriority)){
job.setPriority(JobPriority.HIGH);
}else if("LOW".equals(jobPriority)){
job.setPriority(JobPriority.LOW);
}else if("VERY_LOW".equals(jobPriority)){
job.setPriority(JobPriority.VERY_LOW);
}else{
job.setPriority(JobPriority.NORMAL);
}
}
return job;
}


2.将HFile格式的文件置于新的HBase中:

提供两种方式提交bulkloader任务。

本地代码远程提交任务(注意HBase表一定要手动先建好):
public static void main(String[] args) throws Throwable {

Configuration conf = HBaseConfiguration.create();
conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily",5000); //HFile过多时一定要设置该项

LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
//第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表
//      loader.doBulkLoad(new Path("hdfs://127.0.0.1:9000/output1/"), new HTable(conf, "t3"));
loader.doBulkLoad(new Path("hdfs://127.0.0.1:8020/user/yingzz/hbase/output"), new HTable(conf, "tablename"));
//最后调用System.exit进行退出
}


服务器上通过命令提交任务:

bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=40 <hdfs://storefileoutput> <tablename>

建议通过后者方式,该方式速度较前者节约网络IO,且速度快过前者方法。

ps:

HFile所在的hdfs与被导入表所在的hbase最好基于一套hdfs,这样bulkloader速度极快过程也短;

如果不预先建好被导入表并按rowkey分好region,导入过后所有数据都会在一个region中,必须手动触发split,非常消耗时间。而如果在导入数据前分好table的region,程序则会在导入前对HFile进行分组导入,效率较split&compaction快很多。

参考文献 http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html
http://blog.csdn.net/opensure/article/details/47054861 http://uestzengting.iteye.com/blog/1290557
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase 数据 hdfs