您的位置:首页 > 其它

spark1.4 操作hbase 基于rdd

2016-06-24 11:07 387 查看
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

object readDataFromHbase {

def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}

/** Load properties **/
val prop = new Properties
val inStream = new FileInputStream(propFileName)
prop.load(inStream)

//set spark context and open input file
val sparkMaster = prop.getProperty("hbase.spark.master")
val sparkJobName = prop.getProperty("hbase.spark.job.name")
val sc = new SparkContext(sparkMaster,sparkJobName )
//set hbase connection
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.set(TableInputFormat.INPUT_TABLE, prop.getProperty("hbase.table.name"))

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
val hBaseData = hBaseRDD.map(t=>t._2)
.map(res =>res.getColumnLatestCell("cf".getBytes(), "col".getBytes()))
.map(c=>c.getValueArray())
.map(a=> new String(a, "utf8"))

hBaseData.foreach(println)
}
}
 <!-- HBase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-hadoop2-compat</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-hadoop-compat</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-hadoop-compat</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>${hbase.version}</version></dependency>
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency>

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: