spark streaming插入hbase
2016-03-05 11:12
429 查看
import java.sql.{DriverManager, ResultSet} import org.apache.spark._ import org.apache.spark.streaming._ import scala.util.Random import org.apache.hadoop.hbase.{HTableDescriptor,HColumnDescriptor,HBaseConfiguration,TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put,Table} object Pi { val user="root" val password = "root" val host="10.8.8.123" val database="db_1" val port=3306 val conn_str = "jdbc:mysql://"+host +":"+port+"/"+database val tablename="achi" val cf="a" val qulified="name" def CreatTableIfNotFind(conn:Connection,userTable:TableName): Unit ={ //从Connection获得 Admin 对象(相当于以前的 HAdmin) val admin=conn.getAdmin if(admin.tableExists(userTable)){ println("Table exists!") //admin.disableTable(userTable) //admin.deleteTable(userTable) //exit() }else{ val tableDesc=new HTableDescriptor(userTable) tableDesc.addFamily(new HColumnDescriptor(cf.getBytes)) admin.createTable(tableDesc) println("Create table success!") } } def InsertHbase(table:Table,cf:String,qulified:String,value:String): Unit ={ val p=new Put("id001".getBytes()) p.addColumn(cf.getBytes,qulified.getBytes,value.getBytes) table.put(p) } def main(args: Array[String]) { val conf=new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val ssc=new StreamingContext(conf,Seconds(3)) val lines=ssc.socketTextStream("localhost",9999) val words=lines.map(_.split('|')) words.print() words.foreachRDD{ rdd=>rdd.foreachPartition{ pa=> val conf=HBaseConfiguration.create() val conn=ConnectionFactory.createConnection(conf) val userTable=TableName.valueOf(tablename) val table=conn.getTable(userTable) pa.foreach{ w=> try{ var beg = System.currentTimeMillis() println(w(0)+w(1)) InsertHbase(table,cf,w(0),w(1)) println("***************************************************************") println(" 耗时: " + (System.currentTimeMillis() - beg)+"ms") println("***************************************************************") }catch{ case _:Exception=>println("raw error!") } } table.close() conn.close() } } ssc.start() ssc.awaitTermination() /* Class.forName("com.mysql.jdbc.Driver").newInstance(); val conn1 = DriverManager.getConnection(conn_str,user,password) try { val statement = conn1.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) val rs = statement.executeQuery("select * from achi limit 10") while (rs.next) { println(rs.getString(1)) } } catch { case _ : Exception => println("===>") } finally { conn1.close } */ } }
name := "untitled" version := "1.0" scalaVersion := "2.10.6" libraryDependencies++= Seq( "mysql" % "mysql-connector-java" % "5.1.38", "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-streaming" % "1.5.2", "org.apache.hbase" % "hbase-client" % "1.1.3", "org.apache.hbase" % "hbase-common" % "1.1.3", "org.apache.hbase" % "hbase-server" % "1.1.3" ) resolvers+="OS China" at "http://maven.oschina.net/content/groups/public/"
相关文章推荐
- Dubbo阅读笔记——并发控制和连接控制
- TCP的三次握手和四次挥手
- POJ百炼-2757-最长上升子序列
- U-boot分析与移植(2)----U-boot stage1分析
- kaldi-cuda matrix
- jquery extend 函数详解
- 查看hbase中的中文
- 随机生成30道四则运算
- html,body最顶层元素.
- 查看hbase中的中文
- golang1.5 tar.gz 打包目录,代码分享
- Alcatraz使用之vvDocumenter,代码注释工具
- redis安装
- dubbo 问题汇总
- Odoo 8.0深入浅出开发教程(十) 附录
- U-boot分析与移植(1)----bootloader分析
- 深入了解Mvc路由系统
- //进制转化问题详细分析
- C++拷贝构造函数
- leetcode238 Product of Array Except Self