Spark 读写 HBase 的两种方式(RDD、DataFrame)
2018-09-09 22:28
1316 查看
使用 saveAsHadoopDataset 写入数据
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat //import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf //import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession /** * Created by blockchain on 18-9-9 下午3:45 in Beijing. */ object SparkHBaseRDD { def main(args: Array[String]) { // 屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate() val sc = spark.sparkContext val tablename = "SparkHBase" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181 hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename) // 初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的 val jobConf = new JobConf(hbaseConf) jobConf.setOutputFormat(classOf[TableOutputFormat]) val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14")) val rdd = indataRDD.map(_.split(',')).map{ arr=> /*一个Put对象就是一行记录,在构造方法中指定主键 * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换 * Put.addColumn 方法接收三个参数:列族,列名,数据*/ val put = new Put(Bytes.toBytes(arr(0))) put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2))) (new ImmutableBytesWritable, put) } rdd.saveAsHadoopDataset(jobConf) spark.stop() } }
在 HBase shell 中 查看写入的数据
hbase(main):005:0* scan 'SparkHBase' ROW COLUMN+CELL 1 column=cf1:age, timestamp=1536494344379, value=15 1 column=cf1:name, timestamp=1536494344379, value=Lucy 2 column=cf1:age, timestamp=1536494344380, value=16 2 column=cf1:name, timestamp=1536494344380, value=jack 3 column=cf1:age, timestamp=1536494344379, value=14 3 column=cf1:name, timestamp=1536494344379, value=Lily 5 column=cf1:age, timestamp=1536494344380, value=17 5 column=cf1:name, timestamp=1536494344380, value=mike 4 row(s) in 0.0940 seconds hbase(main):006:0>
如上所示,写入成功。
使用 newAPIHadoopRDD 读取数据
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat //import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf //import org.apache.hadoop.mapreduce.Job import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession /** * Created by blockchain on 18-9-9 下午3:45 in Beijing. */ object SparkHBaseRDD { def main(args: Array[String]) { // 屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate() val sc = spark.sparkContext val tablename = "SparkHBase" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181 hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename) // 如果表不存在,则创建表 val admin = new HBaseAdmin(hbaseConf) if (!admin.isTableAvailable(tablename)) { val tableDesc = new HTableDescriptor(TableName.valueOf(tablename)) admin.createTable(tableDesc) } //读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的 val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) hBaseRDD.foreach{ case (_ ,result) => //获取行键 val key = Bytes.toString(result.getRow) //通过列族和列名获取列 val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes)) val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes)) println("Row key:"+key+"\tcf1.Name:"+name+"\tcf1.Age:"+age) } admin.close() spark.stop() } }
输出如下
Row key:1 cf1.Name:Lucy cf1.Age:15 Row key:2 cf1.Name:jack cf1.Age:16 Row key:3 cf1.Name:Lily cf1.Age:14 Row key:5 cf1.Name:mike cf1.Age:17
Spark DataFrame 通过 Phoenix 读写 HBase
友情提示:JDBC方式 访问 PhoenixApache Spark Plugin
部署Maven:https://blog.csdn.net/yitengtongweishi/article/details/81946562
需要添加的依赖如下:
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>${phoenix.version}</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>${phoenix.version}</version> </dependency>
下面老规矩,直接上代码。
import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{SaveMode, SparkSession} /** * Created by blockchain on 18-9-9 下午8:33 in Beijing. */ object SparkHBaseDataFrame { def main(args: Array[String]) { // 屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate() val url = s"jdbc:phoenix:localhost:2181" val dbtable = "PHOENIXTEST" //spark 读取 phoenix 返回 DataFrame 的 第一种方式 val rdf = spark.read .format("jdbc") .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver") .option("url", url) .option("dbtable", dbtable) .load() rdf.printSchema() //spark 读取 phoenix 返回 DataFrame 的 第二种方式 val df = spark.read .format("org.apache.phoenix.spark") .options(Map("table" -> dbtable, "zkUrl" -> url)) .load() df.printSchema() //spark DataFrame 写入 phoenix,需要先建好表 df.write .format("org.apache.phoenix.spark") .mode(SaveMode.Overwrite) .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url)) .save() spark.stop() } }
在 Phoenix 中查看写入的数据
0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTEST ; +-----+----------+ | PK | COL1 | +-----+----------+ | 1 | Hello | | 2 | World | | 3 | HBase | | 4 | Phoenix | +-----+----------+ 4 rows selected (0.049 seconds) 0: jdbc:phoenix:localhost:2181> 0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTESTCOPY ; +-----+----------+ | PK | COL1 | +-----+----------+ | 1 | Hello | | 2 | World | | 3 | HBase | | 4 | Phoenix | +-----+----------+ 4 rows selected (0.03 seconds) 0: jdbc:phoenix:localhost:2181>
如上所示,写入成功。
原文链接:转载请注明出处,谢谢!
本文参考链接:
Spark与HBase的整合
Spark DataFrame写入HBase的常用方式
spark将数据写入hbase以及从hbase读取数据
Use Spark to read and write HBase data
Apache Spark - Apache HBase Connector
Apache Spark Comes to Apache HBase with HBase-Spark Module
Spark-on-HBase: DataFrame based HBase connector
Spark 下操作 HBase(1.0.0 新 API)
Spark整合HBase(自定义HBase DataSource)
spark通过Phoenix读取hbase数据
相关文章推荐
- Spark将RDD转换成DataFrame的两种方式
- Spark将RDD转换成DataFrame的两种方式
- sparkrdd转dataframe的两种方式
- spark rdd转dataframe的两种方式
- Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
- Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- RDD转换为DataFrame的两种方式及spark sql的简单实例
- Spark读写Hbase的二种方式对比
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- spark基础之RDD和DataFrame的转换方式
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- Java接入Spark之创建RDD的两种方式和操作RDD
- Java接入Spark之创建RDD的两种方式和操作RDD
- RDD转换DataFrame的两种方式的比较
- hbase-spark全新的spark读写hbase的方式
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame