spark streaming整合sparksql
2016-07-13 18:48
661 查看
在streaming中使用sparksql对apache服务器日志进行统计
日志样例:
启动命令:
先 nc -lk 9999
后 bin/spark-submit --class youling.studio.streaming.ApacheAccessLogProcess /Users/rolin/IdeaProjects/spark-test/target/sparktest-1.0-SNAPSHOT.jar --master local[4]
package youling.studio.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /** * The LogAnalyzerStreamingSQL is similar to LogAnalyzerStreaming, except * it computes stats using Spark SQL. * * To feed the new lines of some logfile into a socket for streaming, * Run this command: * % tail -f [YOUR_LOG_FILE] | nc -lk 9999 * * If you don't have a live log file that is being written to, * you can add test lines using this command: * % cat ../../data/apache.access.log >> [YOUR_LOG_FILE] * * Example command to run: * % spark-submit * --class "com.databricks.apps.logs.chapter1.LogAnalyzerStreaming" * --master local[4] * target/scala-2.10/spark-logs-analyzer_2.10-1.0.jar */ object ApacheAccessLogProcess { val WINDOW_LENGTH = new Duration(20 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming SQL in Scala") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.createSchemaRDD val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) //首先应该监听上localhost:9999端口,如果不监听会发生什么情况? //下面的逻辑 println("No access com.databricks.app.logs received in this time interval")已经执行了 //同时日志中报出Connection Refused错误 val logLinesDStream = streamingContext.socketTextStream("localhost", 9999) //转换成DStream[ApacheAccessLog] val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs => { if (accessLogs.count() == 0) { println("No access com.databricks.app.logs received in this time interval") } else { accessLogs.registerTempTable("TBL_ACCESS_LOG") // Calculate statistics based on the content size. val contentSizeStats = sqlContext .sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM TBL_ACCESS_LOG") .first() println("Content Size Avg: %s, Min: %s, Max: %s".format( contentSizeStats.getLong(0) / contentSizeStats.getLong(1), contentSizeStats(2), contentSizeStats(3))) // Compute Response Code to Count. val responseCodeToCount = sqlContext .sql("SELECT responseCode, COUNT(*) FROM TBL_ACCESS_LOG GROUP BY responseCode") .map(row => (row.getInt(0), row.getLong(1))) .take(1000) println(s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""") // Any IPAddress that has accessed the server more than 10 times. val ipAddresses =sqlContext .sql("SELECT ipAddress, COUNT(*) AS total FROM TBL_ACCESS_LOG GROUP BY ipAddress HAVING total > 10") .map(row => row.getString(0)) .take(100) println(s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""") val topEndpoints = sqlContext .sql("SELECT endpoint, COUNT(*) AS total FROM TBL_ACCESS_LOG GROUP BY endpoint ORDER BY total DESC LIMIT 10") .map(row => (row.getString(0), row.getLong(1))) .collect() println(s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""") } }) streamingContext.start() streamingContext.awaitTermination() } }
日志样例:
218.19.140.242 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 200 2933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)" 218.19.140.24 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 302 933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)" 218.19.140.242 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 200 1933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)" 218.19.140.242 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 200 1933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)" 218.19.140.242 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 200 1933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)" 218.19.140.242 – - [10/Dec/2010:09:31:17 +0800] "GET /query/trendxml/district/todayreturn/month/2009-12-14/2010-12-09/haizhu_tianhe.xml HTTP/1.1" 302 1933 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.2.8) Gecko/20100722 Firefox/3.6.8 (.NET CLR 3.5.30729)"
启动命令:
先 nc -lk 9999
后 bin/spark-submit --class youling.studio.streaming.ApacheAccessLogProcess /Users/rolin/IdeaProjects/spark-test/target/sparktest-1.0-SNAPSHOT.jar --master local[4]
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- 直播|易观CTO郭炜:精益化数据分析——如何让你的企业具有BAT一样的分析能力