kafka通过sparkstreaming连oracle模版
2016-09-28 00:07
232 查看
首先定义一个KafkaInfo对象
import java.util.Properties object KafkaInfo { val brokerList = 。。。 val topicTest = "test" val topic = "" val url = "" val prop = new Properties() prop.setProperty("user", "") prop.setProperty("password", "") //thin:表示连接时采用thin模式,oracle中有两种模式 //jdbc:oracle:thin:是一种jni方式的命名 //8523:端口 val urlBiBeta = "jdbc:oracle:thin:@aaa:8001/BIDEV1" val propBiBeta = new Properties() propBiBeta.setProperty("user", "app") propBiBeta.setProperty("password", "app") }然后定义一个KafkaToOracle对象:处理的大体逻辑是kafka的日志数据要实时读进oracle的log表中,但是读的时候由于采用direct方式,会可能有dataloss,所以设置一个offsettable来维持offset信息,因为我们都知道,同一个topic和partition不同时候offset不同,我们就需要把它存进数据库中,如果刚开始的时候offsettable表中有数据,就要从它开始读,如果没有topic,partition,offset 数据就从头开始读,当把kafka的内容信息做etl存入到oracle的日志表后,对原油的offsettable进行update。
import java.math.BigDecimal import java.sql.DriverManager import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.Json import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.parsing.json.JSON /** * Created by zengxiaosen on 16/9/14. */ object KafkaToOracle { private val offsetTable = "mdw" private val logTable = "o" def main(args: Array[String]): Unit ={ val timeInteval = args(0).toLong val conf = new SparkConf().setAppName(this.getClass.toString) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(timeInteval)) val kafkaParams = Map("metadata.broker.list" -> KafkaInfo.brokerList) val fromOffsets = getTopicAndPartitionOffset var kafkaStream : InputDStream[(String, String)] = null if (fromOffsets.nonEmpty){
//为了避免dataloss
kafkaStream = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder, (String, String) ]( ssc, kafkaParams, fromOffsets, //Map[TopicAndPartition, Long],TopicAndPartition就是(topic,partition),Long就是offset
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))}else{kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,Set(KafkaInfo.topicTest))}//get table informationval (schema, insertSql) = getTableInfo(logTable)kafkaStream.foreachRDD(rdd =>if (!rdd.isEmpty()) {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.mapPartitionsWithIndex((i, ite) => {if(ite.nonEmpty) {val topic = offsetRanges(i).topicval partition = offsetRanges(i).partitionval offset = offsetRanges(i).untilOffsettry{val conn = createConnectionconn.setAutoCommit(false)val stmt = conn.prepareStatement(insertSql)for((_, line) <- ite) {val rows = mutable.Map[String, String]()for(col <- schema.split(",")){rows(col) = ""}val values = parseJson(line, rows)for (i <- values.indices){stmt.setString(i+1, values(i))}stmt.addBatch()}stmt.executeBatch()//update topicAndPartition -> offsetval sql =s"""|merge into $offsetTable a|using (select '$topic" as topic, $partition as partition, $offset as offset|from dual) b|on (a.topic = b.topic AND a.partition = b.partition)|when matched then|update set a.offset = b.offset|when not matched then|insert (topic, partition, offset) values (b.topic, b.partition, b.offset)""".stripMarginstmt.execute(sql)conn.commit()conn.close()}catch {case e: Exception =>e.printStackTrace()}}Iterator.empty}).foreach{(_: Nothing) => ()}})ssc.start()ssc.awaitTerminationOrTimeout((timeInteval * 1000 * 1.75).toLong)}/*Parse json string to Map object, then get map values@param line: kafka value, which must by Json formate.g.,{"id":30221920, "os": "Android", "net_info": {"net_type": "WIFI"}}@param rows: the key is the field of oracle table, and value is inited to ""@throws java.lang.Exveption if json parse faild, throw Exveption@return Map valuese.g.,{{{"id": 30221920, "os": "Android", "net_info" {"net_type": "WIFI"}} ==> ArrayBuffer(3.022192E7, "Android", "WIFI")}}}*/def parseJson(line: String, rows: mutable.Map[String, String]): ArrayBuffer[String] = {JSON.parseFull(line) match {case Some(map: Map[String, Any]) =>parseFields(rows, map)var timestamp: String = null if (rows.contains("timestamp") && rows.get("timestamp").get.length > 10) {timestamp = new java.math.BigDecimal(rows.get("timestamp").get).toPlainString}else if (rows.contains("user_timestamp") && rows.get("user_timestamp").get.length > 10){timestamp = new java.math.BigDecimal(rows.get("user_timestamp").get).toPlainString}if (timestamp != null){rows("date_id") = timestamp.substring(0, 8)rows("hour") = timestamp.substring(8, 10)}val values = ArrayBuffer[String]()for (k <- rows.keys.toSeq.sorted) {values.append(rows.getOrElse(k, ""))}valuescase _=> throw new Exception("Parse json failed")}}/*parse map object recursivelyif value is MapType and key is "domain", convert value to Json stringe.g.,{{{Map("id" -> 3.021192E7, "os" -> "Android", "domain" -> Map("name" -> "Robin")) ==>Map("id" -> 3.021192E7, "os" -> "Android", "domain" -> {"name": "Robin"})}}}if value is MapType and key is not "domain", convert (prekey -> (currkey -> value)) to(preKey_currkey -> value) recursivelye.g.,{{{Map("id" -> 3.022192E7, "os" -> "Android", "net_info" -> Map("net_type" -> "WIFI")) ==>Map("id" -> 3.022192E7, "os" -> "Android", "net_info_net_type" -> "WIFI")}}}@param rows Parsed result@param obj Map object to be parsed@param prekey Parent key@throws java.lang.Exception if convert MapType to Json string,failed, throe exception*/def parseFields(rows: mutable.Map[String, String],obj: Map[String, Any],preKey: String = null): Unit = {for (k <- obj.keys){val key = k.toLowerCase()val value = obj.get(k)value match {case Some(map: Map[String, Any]) =>if (key == "domain") {val domainValue = Json.encode(map)rows(key) = domainValue}else {val pk = if (preKey == null) key else preKey + "_" + keyparseFields(rows, map, pk)}case Some(v: Any) =>val pk = if (preKey == null) key else preKey + "_" + keyrows(pk) = v.toStringcase _=> throw new Exception("parse failed")}}}/*Map the columns of {{table}} to map object keys, init the values of this map object to ""e.g.,{{{[id, name, os, domain] ==> Map("id" -> "", "name" -> "", "os" -> "", "domain" -> "")}}}@param table Oracle table@return inited map object*/private def getTableInfo(table: String) = {val conn = createConnectionval stmt = conn.createStatement()val query =s"""|select column_name|from user_tab_columns|where lower(table_name) = lower('$table')|order by column_id""".stripMarginval rs = stmt.executeQuery(query)val columns = ArrayBuffer[String]()while(rs.next()){columns += rs.getString(1).toLowerCase()}conn.close()val schema = columns.sorted.mkString(",")val valuesPlaceHolder = "?," * (columns.length - 1) + "?"val sql = s"insert into $logTable($schema) values($valuesPlaceHolder)"(schema, sql)}def getTopicAndPartitionOffset : Map[TopicAndPartition, Long] = {val conn = createConnectionval stmt = conn.createStatement()val sql =s"""|select topic, partition, offset|from $offsetTable|where topic = '${KafkaInfo.topicTest}'""".stripMarginval resultSet = stmt.executeQuery(sql)var fromOffsets = Map[TopicAndPartition, Long]()while(resultSet.next()){fromOffsets += (TopicAndPartition(resultSet.getString(1),resultSet.getInt(2)) -> resultSet.getLong(3))}conn.close()fromOffsets}private def createConnection = {//JdbcUtils.createConnectionFactory(KafkaInfo.urlBiBeta, KafkaInfo.propBiBeta)DriverManager.getConnection(KafkaInfo.urlBiBeta, KafkaInfo.propBiBeta)}}
相关文章推荐
- linux集成 kafka数据通过flume发送到hadoop
- logstash通过kafka传输nginx日志
- 通过Kafka Admin API执行并获取结果到客户端
- spark streaming 通过log4j 统一输出日志到kafka
- 通过分区和offset拉取Kafka的数据
- filebeat-2-通过kafka队列链接logstash
- spark streaming 通过zookeeper读取kafka上的数据
- 通过pykafka接收Kafka消息队列
- logstash通过kafka传输nginx日志(三)
- 通过编程方式获取Kafka中Topic的Metadata信息
- java日志通过Kafka发送给elk问题处理(1)
- 通过java 向kafka写入数据,druid从kafka加载数据
- kafka--通过python操作topic
- 通过Flume拉取Kafka数据保存到HDFS
- logstash通过codec将完整的数据发送到kafka
- 本地文件到通过flume到kafka
- 【KAFKA】通过docker部署kafka集群
- python通过Pykafka库来连接kafka并收发消息
- Druid通过Kafka加载数据
- logstash通过kafka传输nginx日志(三)