您的位置:首页 > 其它

spark-structstreaming-结果数据存入hbase

2018-01-05 17:54 891 查看

前言

本节描述通过spark-structstreaming将结果结果数据存入hbase

正文

object testWriteResultToHbase{
def main(args:Array[String]){

val kafkaservers=args(0)
val topic=args(1)
val zookeeperservers=args(2)
val tablename=args(3)
val spark=SparkSession
.builder
.appName("testWriteResultToHbase")
.master("local")
.getOrCreate()

import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaservers)
.option("subscribe", topic)
.load()
.selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)")
.as[(String,String,String)]
lines.createTempView("Originalkafka")
import spark.sql
val count=sql("select value from Originalkafka ")
val query =count.writeStream
.outputMode("append")
.foreach(new ForeachWriter[Row]{

var connection: Connection= null
def open(partitionId:Long,version:Long):Boolean={
true
}
def process(record:Row):Unit={
val conf = new HBaseConfiguration()
conf.set("hbase.zookeeper.quorum",zookeeperservers)
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent","/hbase-unsecure")
val table = new HTable(conf, tablename)
val theput= new Put(Bytes.toBytes(record.mkString))
theput.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes("30"))
table.put(theput)
}
def close(errorOrNull:Throwable):Unit={
}
})
.queryName("test")
.format("foreach")
.start()
query.awaitTermination()
}


}

注意:写入hbase的时候,没有创建连接的客户端
ConnectionFactory.createConnection(conf)


所以就没有必要在
open(partitionId:Long,version:Long)
close(errorOrNull:Throwable)
打开连接和关闭连接。

但是本人认为还是有必要的。空了研究一下hbase 客户端源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐