Spark 批量写数据入HBase
2015-06-02 15:26
288 查看
介绍
工作中常常会遇到这种情形,需要将hdfs中的大批量数据导入HBase。本文使用Spark+HBase的方式将RDD中的数据导入HBase中。没有使用官网提供的newAPIHadoopRDD接口的方式。使用本文的方式将数据导入HBase, 7000W条数据,花费时间大概20分钟左右,本文Spark可用核数量为20。本文使用spark版本为1.3.0,hbase版本为0.98.1
hbase表结构为:表名table,列族Family,列为qualifier。
代码如下:
val readFile = sc.textFile("/path/to/file").map(x => x.split(",")) val tableName = "table" readFile.foreachPartition{ x=> { val myConf = HBaseConfiguration.create() myConf.set("hbase.zookeeper.quorum", "web102,web101,web100") myConf.set("hbase.zookeeper.property.clientPort", "2181") myConf.set("hbase.defaults.for.version.skip", "true") val myTable = new HTable(myConf, TableName.valueOf(tableName)) myTable.setAutoFlush(false, false)//关键点1 myTable.setWriteBufferSize(3*1024*1024)//关键点2 x.foreach { y => { println(y(0) + ":::" + y(1)) val p = new Put(Bytes.toBytes(y(0))) p.add("Family".getBytes, "qualifier".getBytes, Bytes.toBytes(y(1))) myTable.put(p) } } myTable.flushCommits()//关键点3 } }
此程序是使用了RDD的foreachPartition函数,在此程序中有三个比较关键的地方。
关键点1_:将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。
关键点2:设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5M即可,本文设置为3M。
关键点3:每一个分片结束后都进行flushCommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。
注:此外如果想提高Spark写数据如Hbase速度,可以增加Spark可用核数量。
相关文章推荐
- Spark随谈——开发指南(译)
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Spark,一种快速数据分析替代方案
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- 如何解决struts2日期类型转换
- hbase shell基础和常用命令详解
- 手把手教你配置Hbase完全分布式环境
- 实战:在Java Web 项目中使用HBase
- HBase基本原理
- HBase中的基本概念
- Spark初探
- Spark Streaming初探
- 【原创】基于分布式存储的开源系统在实时数据库海量历史数据存储项目上的预研
- 搭建hadoop/spark集群环境
- HBase0.96.x开发使用(一)--安装
- 基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南
- 基于solr实现hbase的二级索引