您的位置:首页 > 数据库

如何将spark streaming处理结果保存到关系型数据库中

2017-12-05 21:32 351 查看
spark streaming是一个分布式高可靠的准实时处理系统,其数据源可以flume、Hdfs、kafka等,其结果可以保存到关系型数据库,HDFS上。保存到HDFS上相对简单,一句话就可以搞定,但是要保存到关系数据库中,相对比较麻烦,既要链接数据库,又要知道数据字段。

我们首先写个wordcount程序测试一下,通过网络发数据到spark streaming

发数据程序如下

[java] view
plain copy

import java.io.{PrintWriter}  

  

import java.net.ServerSocket  

import scala.io.Source  

  

  

object SaleSimulation {  

    

 def index(length: Int) = {  

  

    import java.util.Random  

   

   val rdm = new Random  

  

    rdm.nextInt(length)  

  

  }  

  

    

def main(args: Array[String]) {  

     

 if (args.length != 3) {  

    

    System.err.println("Usage: <filename> <port> <millisecond>")  

   

     System.exit(1)  

    

  }  

  

    

  val filename = args(0)  

   

   val lines = Source.fromFile(filename).getLines.toList  

  

    val filerow = lines.length  

  

       val listener = new ServerSocket(args(1).toInt)  

      

while (true) {  

   

     val socket = listener.accept()  

  

      new Thread() {  

     

     override def run = {  

     

       println("Got client connected from: " + socket.getInetAddress)  

     

       val out = new PrintWriter(socket.getOutputStream(), true)  

    

        while (true) {  

      

        Thread.sleep(args(2).toLong)  

    

         val content = lines(index(filerow))  

      

         println(content)  

        

         out.write(content + '\n')  

    

          out.flush()  

      

      }  

     

       socket.close()  

     

     }  

      }.start()  

    

  }  

    

}  

  

}  

打成jar包后运行

[java] view
plain copy

java -cp spark_streaming_test.jar com.pinganfu.ss.SaleSimulation /spark/people.txt 9999 1000  

spark streaming程序如下:

[java] view
plain copy

import java.sql.{PreparedStatement, Connection, DriverManager}    

import java.util.concurrent.atomic.AtomicInteger    

import org.apache.spark.SparkConf    

import org.apache.spark.streaming.{Seconds, StreamingContext}    

import org.apache.spark.streaming._    

import org.apache.spark.streaming.StreamingContext._    

//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?    

object SparkStreamingForPartition {    

  def main(args: Array[String]) {    

    val conf = new SparkConf().setAppName("NetCatWordCount")    

    conf.setMaster("local[3]")    

    val ssc = new StreamingContext(conf, Seconds(5))     

    val dstream = ssc.socketTextStream("hadoopMaster", 9999).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)    

    dstream.foreachRDD(rdd => {    

      //embedded function    

      def func(records: Iterator[(String,Int)]) {    

//Connect the mysql  

        var conn: Connection = null    

        var stmt: PreparedStatement = null    

        try {    

          val url = "jdbc:mysql://hadoopMaster:3306/streaming";    

          val user = "root";    

          val password = "hadoop"    

          conn = DriverManager.getConnection(url, user, password)    

          records.foreach(word => {    

            val sql = "insert into wordcounts values (?,?)";    

            stmt = conn.prepareStatement(sql);    

            stmt.setString(1, word._1)    

            stmt.setInt(2, word._2)    

            stmt.executeUpdate();    

          })    

        } catch {    

          case e: Exception => e.printStackTrace()    

        } finally {    

          if (stmt != null) {    

            stmt.close()    

          }    

          if (conn != null) {    

            conn.close()    

          }    

        }    

      }     

      val repartitionedRDD = rdd.repartition(3)    

      repartitionedRDD.foreachPartition(func)    

    })    

    ssc.start()    

    ssc.awaitTermination()    

  }    

}    

运行结果



1. DStream.foreachRDD是一个Output Operation,DStream.foreachRDD是数据落地很常用的方法

2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=>Unit的函数类型),这样,当

foreachRDD方法在每个Worker上执行时,连接是在Worker上创建。如果Connection的获取放到dstream.foreachRDD之

前,那么Connection的获取动作将发生在Driver端,然后通过序列化的方式发送到各个Worker(Connection的序列化通常是无法正确序列化的)

3. Connection的获取在foreachRDD的参数中获取,同时还要在遍历RDD之前获取(调用RDD的foreach方法前获取),如果遍历中获取,那么RDD中的每个record都要打开关闭连接,这对于数据库连接资源将是极大的考验

4. 业务逻辑处理定义在func中,它是在foreachRDD的方法参数体中定义的,如果把func的定义放到外面,即Driver中,貌似也是可以的,Spark会对计算方法通过Broadcast进行广播到各个计算节点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐