spark-structstreaming-结果数据存入hbase
2018-01-05 17:54
891 查看
前言
本节描述通过spark-structstreaming将结果结果数据存入hbase正文
object testWriteResultToHbase{ def main(args:Array[String]){ val kafkaservers=args(0) val topic=args(1) val zookeeperservers=args(2) val tablename=args(3) val spark=SparkSession .builder .appName("testWriteResultToHbase") .master("local") .getOrCreate() import spark.implicits._ val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers",kafkaservers) .option("subscribe", topic) .load() .selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)") .as[(String,String,String)] lines.createTempView("Originalkafka") import spark.sql val count=sql("select value from Originalkafka ") val query =count.writeStream .outputMode("append") .foreach(new ForeachWriter[Row]{ var connection: Connection= null def open(partitionId:Long,version:Long):Boolean={ true } def process(record:Row):Unit={ val conf = new HBaseConfiguration() conf.set("hbase.zookeeper.quorum",zookeeperservers) conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent","/hbase-unsecure") val table = new HTable(conf, tablename) val theput= new Put(Bytes.toBytes(record.mkString)) theput.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes("30")) table.put(theput) } def close(errorOrNull:Throwable):Unit={ } }) .queryName("test") .format("foreach") .start() query.awaitTermination() }
}
注意:写入hbase的时候,没有创建连接的客户端
ConnectionFactory.createConnection(conf)
所以就没有必要在
open(partitionId:Long,version:Long)和
close(errorOrNull:Throwable)打开连接和关闭连接。
但是本人认为还是有必要的。空了研究一下hbase 客户端源码
相关文章推荐
- 大数据IMF传奇行动绝密课程第120课:Spark Streaming性能优化:如何在End-to-End生产环境下安全高效地把结果数据存入HBase中
- mapreduce实现从hbase中统计数据,结果存入mysql中
- Spark streaming + Kafka 流式数据处理,结果存储至MongoDB、Solr、Neo4j(自用)
- 记录一下SparkStreaming中因为使用redis做数据验证而导致数据结果不对的问题
- Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
- Spark-StructStreaming-计算结果写入到文本文件
- Spark程序使用groupByKey后数据存入HBase出现重复的现象
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- spark Streaming实时写入数据到HBase
- Spark Streaming实时写入数据到HBase
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件
- Kafka+SparkStreaming解析Json数据并插入Hbase,包含部分业务逻辑
- SparkStreaming向Hbase中写数据
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务
- Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
- Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
- spark部署一台机器时sparkstreaming无结果数据打印的问题
- Spark Streaming和Kafka整合是如何保证数据零丢失
- spark streaming集成kafka接收数据的方式