您的位置:首页 > 其它

How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.

2016-09-09 11:39 441 查看


refer:http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html

This article shows a sample code to load data into Hbase or MapRDB(M7) using Scala on Spark.

I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.

1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)

This way is to use Put object to load data one by one. It is not so efficient as bulk loading.

importorg.apache.spark._
importorg.apache.spark.rdd.NewHadoopRDD
importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
importorg.apache.hadoop.hbase.client.HBaseAdmin
importorg.apache.hadoop.hbase.mapreduce.TableInputFormat
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HColumnDescriptor
importorg.apache.hadoop.hbase.util.Bytes
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.client.HTable;

valconf =HBaseConfiguration.create()
valtableName ="/t1"
conf.set(TableInputFormat.INPUT_TABLE, tableName)

valmyTable =new HTable(conf, tableName);
varp =new Put();
p=new Put(newString("row999").getBytes());
p.add("cf".getBytes(),"column_name".getBytes(),newString("value999").getBytes());
myTable.put(p);
myTable.flushCommits();


2. Bulk Load using Hfiles(For Hbase only).

This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.

This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.

importorg.apache.spark._
importorg.apache.spark.rdd.NewHadoopRDD
importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
importorg.apache.hadoop.hbase.client.HBaseAdmin
importorg.apache.hadoop.hbase.mapreduce.TableInputFormat
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HColumnDescriptor
importorg.apache.hadoop.hbase.util.Bytes
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.client.HTable;
importorg.apache.hadoop.hbase.mapred.TableOutputFormat
importorg.apache.hadoop.mapred.JobConf
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable
importorg.apache.hadoop.mapreduce.Job
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat
importorg.apache.hadoop.hbase.KeyValue
importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat
importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

valconf =HBaseConfiguration.create()
valtableName ="hao"
valtable =new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
valjob =Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
valnum =sc.parallelize(1to 10)
valrdd =num.map(x=>{
valkv:KeyValue =new KeyValue(Bytes.toBytes(x), "cf".getBytes(),"c1".getBytes(),"value_xxx".getBytes() )
(newImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Save Hfiles on HDFS
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)

//Bulk load Hfiles to Hbase
valbulkLoader =new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(newPath("/tmp/xxxx19"), table)


After that, 10 rows are inserted.

hbase(main):020:0> scan 'hao'
ROW                                                 COLUMN+CELL
\x00\x00\x00\x01                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x02                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x03                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x04                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x05                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x06                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x07                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x08                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x09                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x0A                                   column=cf:c1, timestamp=1425128075675, value=value_xxx


3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).

This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.

There is only a minor difference comparing to above examples:

Changes from :

rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
To:

rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())


Here is a complete example:

importorg.apache.spark._
importorg.apache.spark.rdd.NewHadoopRDD
importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
importorg.apache.hadoop.hbase.client.HBaseAdmin
importorg.apache.hadoop.hbase.mapreduce.TableInputFormat
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HColumnDescriptor
importorg.apache.hadoop.hbase.util.Bytes
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.client.HTable;
importorg.apache.hadoop.hbase.mapred.TableOutputFormat
importorg.apache.hadoop.mapred.JobConf
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable
importorg.apache.hadoop.mapreduce.Job
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat
importorg.apache.hadoop.hbase.KeyValue
importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat
importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

valconf =HBaseConfiguration.create()
valtableName ="hao"
valtable =new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
valjob =Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
valnum =sc.parallelize(1to 10)
valrdd =num.map(x=>{
valkv:KeyValue =new KeyValue(Bytes.toBytes(x), "cf".getBytes(),"c1".getBytes(),"value_xxx".getBytes() )
(newImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())


Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.

You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.

For example, just change below code from :

rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
To:

job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: