您的位置:首页 > 数据库 > Oracle

kafka通过sparkstreaming连oracle模版

2016-09-28 00:07 232 查看
首先定义一个KafkaInfo对象
import java.util.Properties

object KafkaInfo {

val brokerList = 。。。

val topicTest = "test"
val topic = ""

val url = ""
val prop = new Properties()
prop.setProperty("user", "")
prop.setProperty("password", "")

//thin:表示连接时采用thin模式,oracle中有两种模式
//jdbc:oracle:thin:是一种jni方式的命名
//8523:端口
val urlBiBeta = "jdbc:oracle:thin:@aaa:8001/BIDEV1"
val propBiBeta = new Properties()
propBiBeta.setProperty("user", "app")
propBiBeta.setProperty("password", "app")

}
然后定义一个KafkaToOracle对象:处理的大体逻辑是kafka的日志数据要实时读进oracle的log表中,但是读的时候由于采用direct方式,会可能有dataloss,所以设置一个offsettable来维持offset信息,因为我们都知道,同一个topic和partition不同时候offset不同,我们就需要把它存进数据库中,如果刚开始的时候offsettable表中有数据,就要从它开始读,如果没有topic,partition,offset 数据就从头开始读,当把kafka的内容信息做etl存入到oracle的日志表后,对原油的offsettable进行update。
import java.math.BigDecimal
import java.sql.DriverManager

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.Json
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.json.JSON

/**
* Created by zengxiaosen on 16/9/14.
*/
object KafkaToOracle {

private val offsetTable = "mdw"
private val logTable = "o"

def main(args: Array[String]): Unit ={

val timeInteval = args(0).toLong
val conf = new SparkConf().setAppName(this.getClass.toString)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(timeInteval))

val kafkaParams = Map("metadata.broker.list" -> KafkaInfo.brokerList)
val fromOffsets = getTopicAndPartitionOffset

var kafkaStream : InputDStream[(String, String)] = null
    if (fromOffsets.nonEmpty){
//为了避免dataloss
kafkaStream = KafkaUtils.createDirectStream[
String,
String,
StringDecoder,
StringDecoder,
(String, String)
](
ssc,
kafkaParams,
fromOffsets, //Map[TopicAndPartition, Long],TopicAndPartition就是(topic,partition),Long就是offset
        (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))}else{kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,Set(KafkaInfo.topicTest))}//get table informationval (schema, insertSql) = getTableInfo(logTable)kafkaStream.foreachRDD(rdd =>if (!rdd.isEmpty()) {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.mapPartitionsWithIndex((i, ite) => {if(ite.nonEmpty) {val topic = offsetRanges(i).topicval partition = offsetRanges(i).partitionval offset = offsetRanges(i).untilOffsettry{val conn = createConnectionconn.setAutoCommit(false)val stmt = conn.prepareStatement(insertSql)for((_, line) <- ite) {val rows = mutable.Map[String, String]()for(col <- schema.split(",")){rows(col) = ""}val values = parseJson(line, rows)for (i <- values.indices){stmt.setString(i+1, values(i))}stmt.addBatch()}stmt.executeBatch()//update topicAndPartition -> offsetval sql =s"""|merge into $offsetTable a|using (select '$topic" as topic, $partition as partition, $offset as offset|from dual) b|on (a.topic = b.topic AND a.partition = b.partition)|when matched then|update set a.offset = b.offset|when not matched then|insert (topic, partition, offset) values (b.topic, b.partition, b.offset)""".stripMarginstmt.execute(sql)conn.commit()conn.close()}catch {case e: Exception =>e.printStackTrace()}}Iterator.empty}).foreach{(_: Nothing) => ()}})ssc.start()ssc.awaitTerminationOrTimeout((timeInteval * 1000 * 1.75).toLong)}/*Parse json string to Map object, then get map values@param line: kafka value, which must by Json formate.g.,{"id":30221920, "os": "Android", "net_info": {"net_type": "WIFI"}}@param rows: the key is the field of oracle table, and value is inited to ""@throws java.lang.Exveption if json parse faild, throw Exveption@return Map valuese.g.,{{{"id": 30221920, "os": "Android", "net_info" {"net_type": "WIFI"}} ==> ArrayBuffer(3.022192E7, "Android", "WIFI")}}}*/def parseJson(line: String, rows: mutable.Map[String, String]): ArrayBuffer[String] = {JSON.parseFull(line) match {case Some(map: Map[String, Any]) =>parseFields(rows, map)var timestamp: String = null        if (rows.contains("timestamp") && rows.get("timestamp").get.length > 10) {timestamp = new java.math.BigDecimal(rows.get("timestamp").get).toPlainString}else if (rows.contains("user_timestamp") && rows.get("user_timestamp").get.length > 10){timestamp = new java.math.BigDecimal(rows.get("user_timestamp").get).toPlainString}if (timestamp != null){rows("date_id") = timestamp.substring(0, 8)rows("hour") = timestamp.substring(8, 10)}val values = ArrayBuffer[String]()for (k <- rows.keys.toSeq.sorted) {values.append(rows.getOrElse(k, ""))}valuescase _=> throw new Exception("Parse json failed")}}/*parse map object recursivelyif value is MapType and key is "domain", convert value to Json stringe.g.,{{{Map("id" -> 3.021192E7, "os" -> "Android", "domain" -> Map("name" -> "Robin")) ==>Map("id" -> 3.021192E7, "os" -> "Android", "domain" -> {"name": "Robin"})}}}if value is MapType and key is not "domain", convert (prekey -> (currkey -> value)) to(preKey_currkey -> value) recursivelye.g.,{{{Map("id" -> 3.022192E7, "os" -> "Android", "net_info" -> Map("net_type" -> "WIFI")) ==>Map("id" -> 3.022192E7, "os" -> "Android", "net_info_net_type" -> "WIFI")}}}@param rows Parsed result@param obj Map object to be parsed@param prekey Parent key@throws java.lang.Exception if convert MapType to Json string,failed, throe exception*/def parseFields(rows: mutable.Map[String, String],obj: Map[String, Any],preKey: String = null): Unit = {for (k <- obj.keys){val key = k.toLowerCase()val value = obj.get(k)value match {case Some(map: Map[String, Any]) =>if (key == "domain") {val domainValue = Json.encode(map)rows(key) = domainValue}else {val pk = if (preKey == null) key else preKey + "_" + keyparseFields(rows, map, pk)}case Some(v: Any) =>val pk = if (preKey == null) key else preKey + "_" + keyrows(pk) = v.toStringcase _=> throw new Exception("parse failed")}}}/*Map the columns of {{table}} to map object keys,  init the values of this map object to ""e.g.,{{{[id, name, os, domain] ==> Map("id" -> "", "name" -> "", "os" -> "", "domain" -> "")}}}@param table Oracle table@return inited map object*/private def getTableInfo(table: String) = {val conn = createConnectionval stmt = conn.createStatement()val query =s"""|select column_name|from user_tab_columns|where lower(table_name) = lower('$table')|order by column_id""".stripMarginval rs = stmt.executeQuery(query)val columns = ArrayBuffer[String]()while(rs.next()){columns += rs.getString(1).toLowerCase()}conn.close()val schema = columns.sorted.mkString(",")val valuesPlaceHolder = "?," * (columns.length - 1) + "?"val sql = s"insert into $logTable($schema) values($valuesPlaceHolder)"(schema, sql)}def getTopicAndPartitionOffset : Map[TopicAndPartition, Long] = {val conn = createConnectionval stmt = conn.createStatement()val sql =s"""|select topic, partition, offset|from $offsetTable|where topic = '${KafkaInfo.topicTest}'""".stripMarginval resultSet = stmt.executeQuery(sql)var fromOffsets = Map[TopicAndPartition, Long]()while(resultSet.next()){fromOffsets += (TopicAndPartition(resultSet.getString(1),resultSet.getInt(2)) -> resultSet.getLong(3))}conn.close()fromOffsets}private def createConnection = {//JdbcUtils.createConnectionFactory(KafkaInfo.urlBiBeta, KafkaInfo.propBiBeta)DriverManager.getConnection(KafkaInfo.urlBiBeta, KafkaInfo.propBiBeta)}}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: