SODBASE实时大数据基础(三):SODBASE与Spark streaming集成
2016-04-11 19:49
681 查看
基于内存RDD的Spark框架相比Hadoop MapReduce框架有许多独特的优点,在越来越多项目中得到应用。Spark计算框架包括其Streaming组件,是批处理(Lamda架构中Batch Layer)的思路。若要在使用Spark的同时,
1)不修改Spark streaming代码和重启应用,实现多场景流式计算、规则管理
2)实现低延时关联模式实时分析
可将SODBASE CEP和Spark结合来使用。这样,可以方便地使用SODBASE EPL管理规则,也实现低延时,实现许多完全滑动窗口(非批滑动窗口)规则和复杂规则监测。
规则为Google的股票报价5000毫秒内出现了3次
通过SODBASE CEP运行此模型
也可以在Spark环境中运行此程序,将localhost改为SODBASE CEP运行的服务器IP地址
即可以接收SODBASE CEP传过来的流数据了
注:完全滑动窗口(非批量滑动窗口),例如监测一个人5分钟内登录系统三次则触发事件,是指他任意三次登录在5分钟完成内即满足规则,如三次登录的时间为10:50分30秒、10:53分25秒、10:55分05秒。而像批量滑动窗口如10:50~10:55,10:50~:11:00,或1分钟滑动量的10:50~10:55,10:51~10:57,10:52~10:57分批滑动窗口,Spark streaming都难以监测速度层(Speed Layer)这样的规则事件,需要和SODBASE CEP配合使用。
实际规则管理示例和Kafka运用 参考:
SODBASE
与Spark streaming集成-规则管理
SODBASE 实时大数据软件用于轻松、高效实施数据监测、监控类、实时交易类项目
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
1)不修改Spark streaming代码和重启应用,实现多场景流式计算、规则管理
2)实现低延时关联模式实时分析
可将SODBASE CEP和Spark结合来使用。这样,可以方便地使用SODBASE EPL管理规则,也实现低延时,实现许多完全滑动窗口(非批滑动窗口)规则和复杂规则监测。
示例操作步骤
本文通过实例介绍如何将SODBASE CEP的输出通过Spark streaming保存为HDFS文件。(1)使用SODBASE CEP的PubSub适配器输出数据
对应的适配器类为com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor,示例CEP模型如下规则为Google的股票报价5000毫秒内出现了3次
SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice FROM T1:模拟股票,T2:模拟股票,T3:模拟股票 PATTERN T1;T2;T3 WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google' WITHIN 5000
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <GraphModelData> <CEPSoftwareVersion>2</CEPSoftwareVersion> <inputAdaptors> <inputAdaptorClassName>com.sodbase.inputadaptor.EventGeneratorInputAdaptor</inputAdaptorClassName> <adaptorParams>模拟股票</adaptorParams> <adaptorParams>1000</adaptorParams> <isExternal>false</isExternal> </inputAdaptors> <SODSQLs>CREATE QUERY socketouput SELECT T1.name AS name,T1.price+T2.price+T3.price AS sumprice FROM T1:模拟股票,T2:模拟股票,T3:模拟股票 PATTERN T1;T2;T3 WHERE T1.name='Google' AND T2.name='Google'AND T3.name='Google' WITHIN 5000 </SODSQLs> <outputAdaptors> <isOutputAsSelection>true</isOutputAsSelection> <outputAdaptorClassName>com.sodbase.outputadaptor.socket.pubsub.SocketPubSubStringOutputAdaptor</outputAdaptorClassName> <adaptorParams>19999</adaptorParams> <adaptorParams>-1</adaptorParams> <isExternal>false</isExternal> <queryName>socketouput</queryName> </outputAdaptors> <modelName>socketouput</modelName> <modelVersion>1.0</modelVersion> <modelDescription></modelDescription> </GraphModelData>
通过SODBASE CEP运行此模型
(2)Spark程序访问socket 19999端口,接收SODBASE CEP的数据
package example.streaming import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel object Main { def main(args:Array[String]) { val sparkConf = new SparkConf().setAppName("Main") val ssc = new StreamingContext(sparkConf, Seconds(60)) val lines = ssc.socketTextStream("localhost", 19999, StorageLevel.MEMORY_AND_DISK_SER) //lines.print lines.saveAsTextFiles("streamfile", "txt") ssc.start() ssc.awaitTermination() } }
也可以在Spark环境中运行此程序,将localhost改为SODBASE CEP运行的服务器IP地址
val lines = ssc.socketTextStream("ip", 19999, StorageLevel.MEMORY_AND_DISK_SER)
即可以接收SODBASE CEP传过来的流数据了
注:完全滑动窗口(非批量滑动窗口),例如监测一个人5分钟内登录系统三次则触发事件,是指他任意三次登录在5分钟完成内即满足规则,如三次登录的时间为10:50分30秒、10:53分25秒、10:55分05秒。而像批量滑动窗口如10:50~10:55,10:50~:11:00,或1分钟滑动量的10:50~10:55,10:51~10:57,10:52~10:57分批滑动窗口,Spark streaming都难以监测速度层(Speed Layer)这样的规则事件,需要和SODBASE CEP配合使用。
实际规则管理示例和Kafka运用 参考:
SODBASE
与Spark streaming集成-规则管理
SODBASE 实时大数据软件用于轻松、高效实施数据监测、监控类、实时交易类项目
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- SQL中的三值逻辑
- SQL Server 作业批量停止
- 结束SQL阻塞的进程
- 动态生成SQL Server视图作业
- Release Notes - Apache Storm - Version 0.9.2-incub
- SQL Server 语句操纵数据库
- Spark随谈——开发指南(译)
- SQL(结构化查询语句)
- oracle sql日期比较
- Spark,一种快速数据分析替代方案
- linux快速部署mysql服务器
- sql 存储过程分页
- 在WINXP系统上安装SQL Server企业版的方法
- 通过批处理调用SQL的方法(osql)
- SQL Server 存储过程的分页
- ASP程序与SQL存储过程结合使用详解