How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.
2016-09-09 11:39
441 查看
refer:http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html
This article shows a sample code to load data into Hbase or MapRDB(M7) using Scala on Spark.I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.
1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)
This way is to use Put object to load data one by one. It is not so efficient as bulk loading.importorg.apache.spark._ importorg.apache.spark.rdd.NewHadoopRDD importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} importorg.apache.hadoop.hbase.client.HBaseAdmin importorg.apache.hadoop.hbase.mapreduce.TableInputFormat importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.hbase.HColumnDescriptor importorg.apache.hadoop.hbase.util.Bytes importorg.apache.hadoop.hbase.client.Put; importorg.apache.hadoop.hbase.client.HTable; valconf =HBaseConfiguration.create() valtableName ="/t1" conf.set(TableInputFormat.INPUT_TABLE, tableName) valmyTable =new HTable(conf, tableName); varp =new Put(); p=new Put(newString("row999").getBytes()); p.add("cf".getBytes(),"column_name".getBytes(),newString("value999").getBytes()); myTable.put(p); myTable.flushCommits();
2. Bulk Load using Hfiles(For Hbase only).
This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
importorg.apache.spark._ importorg.apache.spark.rdd.NewHadoopRDD importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} importorg.apache.hadoop.hbase.client.HBaseAdmin importorg.apache.hadoop.hbase.mapreduce.TableInputFormat importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.hbase.HColumnDescriptor importorg.apache.hadoop.hbase.util.Bytes importorg.apache.hadoop.hbase.client.Put; importorg.apache.hadoop.hbase.client.HTable; importorg.apache.hadoop.hbase.mapred.TableOutputFormat importorg.apache.hadoop.mapred.JobConf importorg.apache.hadoop.hbase.io.ImmutableBytesWritable importorg.apache.hadoop.mapreduce.Job importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat importorg.apache.hadoop.hbase.KeyValue importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles valconf =HBaseConfiguration.create() valtableName ="hao" valtable =new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) valjob =Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: valnum =sc.parallelize(1to 10) valrdd =num.map(x=>{ valkv:KeyValue =new KeyValue(Bytes.toBytes(x), "cf".getBytes(),"c1".getBytes(),"value_xxx".getBytes() ) (newImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Save Hfiles on HDFS rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) //Bulk load Hfiles to Hbase valbulkLoader =new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(newPath("/tmp/xxxx19"), table)
After that, 10 rows are inserted.
hbase(main):020:0> scan 'hao' ROW COLUMN+CELL \x00\x00\x00\x01 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x02 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x03 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x04 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x05 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x06 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x07 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x08 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x09 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x0A column=cf:c1, timestamp=1425128075675, value=value_xxx
3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).
This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)To:
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:
importorg.apache.spark._ importorg.apache.spark.rdd.NewHadoopRDD importorg.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} importorg.apache.hadoop.hbase.client.HBaseAdmin importorg.apache.hadoop.hbase.mapreduce.TableInputFormat importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.hbase.HColumnDescriptor importorg.apache.hadoop.hbase.util.Bytes importorg.apache.hadoop.hbase.client.Put; importorg.apache.hadoop.hbase.client.HTable; importorg.apache.hadoop.hbase.mapred.TableOutputFormat importorg.apache.hadoop.mapred.JobConf importorg.apache.hadoop.hbase.io.ImmutableBytesWritable importorg.apache.hadoop.mapreduce.Job importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat importorg.apache.hadoop.hbase.KeyValue importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles valconf =HBaseConfiguration.create() valtableName ="hao" valtable =new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) valjob =Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: valnum =sc.parallelize(1to 10) valrdd =num.map(x=>{ valkv:KeyValue =new KeyValue(Bytes.toBytes(x), "cf".getBytes(),"c1".getBytes(),"value_xxx".getBytes() ) (newImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Directly bulk load to Hbase/MapRDB tables. rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())To:
job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19") rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
相关文章推荐
- How to load large files safely into InnoDB with LOAD DATA INFILE
- Use sql loader to load data into database.
- How-to: make spark streaming collect data from Kafka topics and store data into hdfs
- How do I load text or csv file data into SQL Server?
- How to use HttpWebRequest to post data to another page which is on another server
- How-to: use spark to suport query across mysql tables and hbase tables
- How-to: Use HBase Bulk Loading, and Why
- [How to] HBase的bulkload使用方法
- How to use insert or retrieve data by using Core Data in iOS
- How to use Display Zoom on your iPhone 6 or iPhone 6 Plus
- How to use an ActiveX script task to import data into a new Excel file
- How To Use Hbase Bulk Loading
- How To Load CLOB Data from a File into a CLOB column using PL/SQL
- How to load data into SAP HANA database
- How to access HBase from spark-shell using YARN as the master on CDH 5.3 and Spark 1.2
- Core Data on iOS 5 Tutorial: How To Use NSFetchedResultsController
- How-to: effective store kafka data into hdfs via spark streaming
- STL template&Container--how to change normal function into generic function.
- How to use 'SetItemData()' and 'GetItemData()'?
- Can i data you on tomorrow night(Mon.)or Tue. before 14:00 to help me my schoolwork?~~