您的位置:首页 > 其它

Hbase 读写操作的部分实践总结

2018-03-05 16:52 561 查看
Hbase 读写操作的部分实践总结主要包含个人开发过程中遇到的操作hbase数据,读写操作的一些代码样例,已经全部测试通过的。从hbase获取数据的功能代码,包含有filter的,根据rowkey的范围检索,以及全表扫描三部分的structtype构造例子val strctTupe = new StructType(Array(StructField("user_id",StringType, false),StructField("record_date", StringType, false),StructField("MULTIPLE_PROFESSION_SCORE", StringType, false)))import java.utilimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{HTable, Result, Scan}import org.apache.hadoop.hbase.filter._import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{StringType, StructField, StructType}import scala.collection.mutable.ArrayBuffer/*** Created by ** on 2017/10/31.* 根据需要,传入需要展示的字段信息,以及过滤条件,直接注册生成有结果集生成的临时表* 返回的表名是:tbl_nm_tmp*/object GetHbaseTable {/*** 根据rowkey进行数据筛选* 测试:使用scan的setStartRow设置rowkey的起始位置,* 可以指定rowkey的前缀进行匹配查询,* 不可以指定rowkey中间的一部分进行数据筛选* @param sparkSesson* @param tbl_nm 表名* @param show_info 展示的列名* @param tuple rowkey对日期筛选的条件,startTime,endTime 区间是[startTime,endTime)* 注册成临时表:tabl_nm_tmp*/def FilterRowkeyGetHtable(sparkSesson:SparkSession,tbl_nm:String,show_info:Array[(String,String)],tuple:Tuple2[String,String]): Unit ={val sparkContext = sparkSesson.sparkContextval sqlContext = sparkSesson.sqlContextval hbaseconf = HBaseConfiguration.create()hbaseconf.set(TableInputFormat.INPUT_TABLE,tbl_nm)val table = new HTable(hbaseconf,tbl_nm)val scan = new Scan()scan.setStartRow(Bytes.toBytes(tuple._1))scan.setStopRow(Bytes.toBytes(tuple._2))for(i <- show_info){scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2.toLowerCase))}val ColumnValueScanner = table.getScanner(scan)/*** -------------------------重点-----------------------------* 将扫描得到的结果值转换成可以匹配注册表用的RDD[ROW]* 使用** 此处对ColumnValueScanner的处理不完善,需要后续修改,查找有效的方法,* 可以将structtype中的类型细化为具体类型,并在此处使用模式匹配处理类型,但是需要保证对空指针,异常数据等的预处理【这是个坑】。 */val listRow = new util.ArrayList[Row]()var flag = truewhile(flag){try{//每个result都是一行数据val r:Result = ColumnValueScanner.next()val arr = ArrayBuffer[String]()//正常此处需要根据structType中元素类型,以及表中元素的类型指定类型,但是此处由于部分未知原因,转换为其他类型会报错,故统一转换成String // 上面这种情况,一部分是因为空指针的问题,以及脏数据的原因导致的。 //获取rowkeyarr+=Bytes.toString(r.getRow)for(col<-show_info){arr+=Bytes.toString(r.getValue(col._1.getBytes(),col._2.getBytes()))}val row = Row.fromSeq(arr.toSeq)listRow.add(row)}catch{case e:Exception=> flag = false}}//构建RDD//val seqRDD = sc.makeRDD(arr)val schema = StructType({val list = new util.ArrayList[StructField]()list.add(StructField("rowkey",StringType,true))for(col<-show_info){list.add( StructField(col._2,StringType,true))}list})//创建dataframeval df = sqlContext.createDataFrame(listRow,schema)val tbl = tbl_nm.split(":")(1)df.createTempView(tbl+"_tmp")// println("----------------------------------"+sqlContext.sql(s"select * from $tbl"+"_tmp").count())ColumnValueScanner.close()}/*** 有filter参与的局部数据获取* @param sparkSesson* @param tbl_nm 表名* @param show_info 需要展示的列(列族,列名)* @param filter_info 过滤条件数组 (列族,列名,过滤值,过滤条件)* 注册成临时表:tabl_nm_tmp*/def FilterGetRegHtable(sparkSesson:SparkSession,tbl_nm:String,show_info:Array[(String,String)],filter_info:Array[(String,String,String,String)])={//构建sparkcontext和sqlContextval sc = sparkSesson.sparkContextval sqlContext = sparkSesson.sqlContext//构建hbaseConfval hbaseConf = HBaseConfiguration.create()//定义表Hbase表的名字val tableName = tbl_nm//设置需要在hbase中查询的表名hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)//构建表val table = new HTable(hbaseConf,tableName)val scan = new Scan()//指定列族和需要显示的列名//添加多个需要用到的列val length = show_info.lengthfor(i <- show_info){scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2))}/* scan.addColumn(Bytes.toBytes("basicinfo"),Bytes.toBytes("WAYBILL_NO"))scan.addColumn(Bytes.toBytes("basicinfo"),Bytes.toBytes("PENDING_TYPE"))*///设置rowkey的范围,启示和结束//scan.setStartRow(Bytes.toBytes(""))//scan.setStopRow(Bytes.toBytes(""))val fil_len = filter_info.lengthval filter_arr = new util.ArrayList[Filter](fil_len)if(fil_len>0){for(i <- filter_info){i._4 match {case "=" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes(i._3)))filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case "<" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.LESS,new BinaryComparator(Bytes. toBytes( i._3)))filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case "<=" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case ">" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.GREATER,new BinaryComparator(Bytes. toBytes( i._3)))filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case ">=" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.GREATER_OR_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))//filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case "!=" =>{val filter1 = new SingleColumnValueFilter(Bytes.toBytes( i._1),Bytes.toBytes(i._2),CompareFilter.CompareOp.NOT_EQUAL,new BinaryComparator(Bytes. toBytes( i._3)))filter1.setFilterIfMissing(true)filter_arr.add(filter1)}case _=>{}}}}/*** 通过使用filterlist可以加载多个过滤器* 设置多个过滤器*/val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,filter_arr)scan.setFilter(filterList)//获取表的扫描val ColumnValueScanner = table.getScanner(scan)/*** -------------------------重点-----------------------------* 将扫描得到的结果值转换成可以匹配注册表用的RDD[ROW]* 使用** 此处对ColumnValueScanner的处理不完善,需要后续修改,查找有效的方法。【同上】*/val listRow = new util.ArrayList[Row]()var flag = truewhile(flag){try{//每个result都是一行数据val r:Result = ColumnValueScanner.next()val arr = ArrayBuffer[String]()//正常此处需要根据structType中元素类型,以及表中元素的类型指定类型,但是此处由于部分未知原因,转换为其他类型会报错,故统一转换成Stringarr+=Bytes.toString(r.getRow)for(col<-show_info){arr+=Bytes.toString(r.getValue(col._1.getBytes(),col._2.getBytes()))}val row = Row.fromSeq(arr.toSeq)listRow.add(row)}catch{case e:Exception=> flag = false}}//构建RDD//val seqRDD = sc.makeRDD(arr)val schema = StructType({val list = new util.ArrayList[StructField]()list.add(StructField("rowkey",StringType,true))for(col<-show_info){list.add( StructField(col._2,StringType,true))}list})//创建dataframeval df = sqlContext.createDataFrame(listRow,schema)df.createTempView(tbl_nm+"_tmp")ColumnValueScanner.close()}/*** 全量扫描的表数据获取* 注意:此处所有的列全部处理成string类型,如果后续需要改为具体的类型,可以传入一个构造好的structType,替换* @param sparkSession* @param tabMN 表名* @param cf 列族* @param cols 列名* 注册成临时表:tabl_nm_tmp*/def NoFilterGetRegHtable(sparkSession:SparkSession,tabMN:String,cf:String,cols:Array[String]): Unit ={val sc = sparkSession.sparkContextval sqlContext = sparkSession.sqlContext//hbase配置val hbaseconf = HBaseConfiguration.create()//设置表名//hbaseconf.set(TableInputFormat.INPUT_TABLE,"db_oss_ywgl:ywhbase_sys_user_info")hbaseconf.set(TableInputFormat.INPUT_TABLE,tabMN)//获取返回数据集val userRDD = sc.newAPIHadoopRDD(hbaseconf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])userRDD.cache()val schema = StructType({val list = new util.ArrayList[StructField]()list.add(StructField("rowkey",StringType,true))for(col<-cols){list.add( StructField(col.toLowerCase,StringType,true))}list})val rowRDD = userRDD.map(r=>{val arr =ArrayBuffer[String]()arr+=Bytes.toString(r._2.getRow)for(col<-cols){arr+=Bytes.toString(r._2.getValue("cf".getBytes(),col.getBytes()))}Row.fromSeq(arr.toSeq)})//构建表名val tmp_tblNM = tabMN.split(":")(1)//println(tmp_tblNM+"-----------------")val df = sqlContext.createDataFrame(rowRDD,schema)df.createTempView(tmp_tblNM+"_tmp")//sqlContext.sql("select * from ywhbase_sys_user_info_tmp").show(10)}}object test{def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[2]").appName("RowkeyScanTest").getOrCreate()val show_cols = Array(("cf","MAJOR_NM"),("cf","difficulty_level"),("cf","create_time"),("cf","deal_result"))//GetHbaseTable.FilterRowkeyGetHtable(sparkSession,"db_oss_ywgl:ywhbase_adjust_res_bill",show_cols)}}写入hbase的两种方式   
saveAsNewAPIHadoopDataset和控制每次写入的量
package com.gh.HbaseWideTable.Toolimport java.utilimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import org.apache.hadoop.hbase.client.{HTable, Put, Result}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.DataFrame/*** Created by admin2 on 2017/10/31.* 根据传入的数据集,将数据写入到hbase中 saveAsNewAPIHadoopDataset */object WriteHbaseTable {/*** 将操作完成的结果写入hbase表中* @param df 存放结果的dataframe* @param sc sparkcontext* @param tbl_NM 表名* @param cf 列族* @param cols 列名组成的数组*/def runWriteHbaseTable(df:DataFrame,sc:SparkContext,tbl_NM:String,cf:String,cols:Array[String]): Unit ={//配置输出的表名sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tbl_NM)//配置jobval job = new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])var i=1val rdd = df.rdd.map(res=>{val put = new Put(if(res.isNullAt(0)) null else Bytes.toBytes(res.getString(0))) //rowkeyi=1val size = res.sizewhile(i<size){put.add(Bytes.toBytes(cf),Bytes.toBytes(cols(i - 1).toLowerCase),if(res.isNullAt(i)) null else Bytes.toBytes(res.getString(i)))i+=1}(new ImmutableBytesWritable, put)})rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)}}/*** Created by Administrator on 2017/11/24.* 控制每次写入hbase 的数据量*/
object WriteHbase {
def writeToHbase(data:RDD[Put],name: String) = {
val tableName = name

val myConf = HBaseConfiguration.create()
// myConf.set("hbase.zookeeper.quorum", "ddp-nnlte-001,ddp-nnlte-002")
myConf.set("hbase.zookeeper.property.clientPort", "2181")
myConf.set("hbase.defaults.for.version.skip", "true")
val myTable = new HTable(myConf, TableName.valueOf(tableName))
myTable.setAutoFlush(false, false) //关键点1   关闭自动提交
myTable.setWriteBufferSize(6 * 1024 * 1024) //关键点2   设置写缓存区的大小
data.foreachPartition {
p => {
var i:Long = 0
val plist = new util.ArrayList[Put]()
p.foreach { put => {
put.setWriteToWAL(false)
i = i+1
plist.add(put)

if(i%1000==0){
myTable.put(plist)
plist.clear()
}
if(i%100000==0)
myTable.flushCommits()
if(i%100000==0)
println(s"\n\n\n改分区写入$i\n\n\n")
}
}
myTable.put(plist)
myTable.flushCommits() //关键点3

println(name+"------->>>> total_num   "+i)
}
}
}
}
 

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase