您的位置:首页 > 数据库

Spark组件之Spark Streaming学习3--结合SparkSQL的使用(wordCount)

2016-04-26 17:26 706 查看
更多代码请见:https://github.com/xubo245/SparkLearning

1.通过建立一个对象来获取Streaming的单例对象

val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._


object SQLContextSingleton {

@transient  private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}


然后对每个rdd进行操作,将获取的数据注册成table:words,然后执行sqlContext.sql,最后show出来

val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Register as table
wordsDataFrame.registerTempTable("words")

// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()


2.运行:

一个terminal:

nc -lk 9999
另一个:

hadoop@Mcnode6:~/cloud/spark-1.5.2$ ./bin/run-example streaming.SqlNetworkWordCount localhost 9999


显示的记录很多

输入:

hadoop@Mcnode6:~$ nc -lk 9999
a
bbbb
a
a
b
b
b
b
b

b
b
spq

hello
a
a
a
a
a
aaaaa
a
a
a
a
a

a
a

h
dsf

asd
a
sd


运行结果:

16/04/26 17:24:10 INFO scheduler.DAGScheduler: Job 18 finished: foreachRDD at SqlNetworkWordCount.scala:63, took 1.118770 s
+----+-----+
|word|total|
+----+-----+
| asd|    1|
|   a|    3|
|   h|    1|
| dsf|    1|
|    |    3|
+----+-----+


3.源码:

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
* network every second.
*
* Usage: SqlNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
*/

object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd: RDD[String], time: Time) => {
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Register as table
wordsDataFrame.registerTempTable("words")

// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
})

ssc.start()
ssc.awaitTermination()
}
}

/** Case class for converting RDD to DataFrame */
case class Record(word: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
// scalastyle:on println
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息