Spark组件之Spark Streaming学习3--结合SparkSQL的使用(wordCount)
2016-04-26 17:26
706 查看
更多代码请见:https://github.com/xubo245/SparkLearning
1.通过建立一个对象来获取Streaming的单例对象
然后对每个rdd进行操作,将获取的数据注册成table:words,然后执行sqlContext.sql,最后show出来
2.运行:
一个terminal:
显示的记录很多
输入:
运行结果:
3.源码:
1.通过建立一个对象来获取Streaming的单例对象
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._
object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
然后对每个rdd进行操作,将获取的数据注册成table:words,然后执行sqlContext.sql,最后show出来
val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Register as table wordsDataFrame.registerTempTable("words") // Do word count on table using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show()
2.运行:
一个terminal:
nc -lk 9999另一个:
hadoop@Mcnode6:~/cloud/spark-1.5.2$ ./bin/run-example streaming.SqlNetworkWordCount localhost 9999
显示的记录很多
输入:
hadoop@Mcnode6:~$ nc -lk 9999 a bbbb a a b b b b b b b spq hello a a a a a aaaaa a a a a a a a h dsf asd a sd
运行结果:
16/04/26 17:24:10 INFO scheduler.DAGScheduler: Job 18 finished: foreachRDD at SqlNetworkWordCount.scala:63, took 1.118770 s +----+-----+ |word|total| +----+-----+ | asd| 1| | a| 3| | h| 1| | dsf| 1| | | 3| +----+-----+
3.源码:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.Streaming.learning
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
* network every second.
*
* Usage: SqlNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
*/
object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd: RDD[String], time: Time) => {
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Register as table
wordsDataFrame.registerTempTable("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
})
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
// scalastyle:on println
相关文章推荐
- Java IO与NIO的一些文件拷贝测试
- Stream、WshShell、WshUrlShortcut对象及Shell.Application的参数与使用
- Node.js中的流(Stream)介绍
- php中stream(流)的用法
- php错误提示failed to open stream: HTTP request failed!的完美解决方法
- C# Stream 和 byte[] 之间的转换
- 浅析Node.js 中 Stream API 的使用
- Nodejs Stream 数据流使用手册
- 利用stream实现一个简单的http下载器
- 浅谈PHP中Stream(流)
- php常用Stream函数集介绍
- pgpool-II+Hot_standby+Streaming replication环境搭建
- java8新特性(2)-Stream
- Scala: 一次命令式到函数式的重构
- PHP使用stream_context_create()模拟POST/GET请求的方法
- Nodejs Stream 数据流使用手册
- 标签 stream_context_create - 第1页 -- 简明现代魔法
- Oracle 10g R2 Stream环境的监控管理总结
- Oracle 10g R2 Stream复制环境的归档管理总结