SODBASE CEP学习(十六):CEP与数据库交互
2016-11-19 22:50
573 查看
一些时候出于项目需求或复用,需要将CEP和数据库结合起来用。SODBASE CEP可以很好地支持这类型需求。本文将介绍CEP与数据库交互的两种常用方式。
安装完毕后从开始菜单运行“Oracle Database 11g Express Edition”->"运行 SQL 命令行"
我们就用Oracle自带的示例表空间users,自带的用户HR
(2)下载最新版的SODBASE Studio(建议2.0.38版本以上版本),解压。
方式一:
下载CEP模型oracleExample.sod,运行SODBASE
Studio,导入oracleExample模型
在画板上右键->查看代码查看此模型的EPL语句
测试运行,结果如下所示,流的value2字段的值只有是在REGION_ID里面才会输出
方式二:
下载CEP模型oracleadaptor01.sod和oracleadaptor01_output.sod,运行SODBASE
Studio,导入这两个模型。依次运行oracleadaptor01_output和oracleadaptor01
oracleadaptor01是定时查询数据库,oracleadaptor01_output打印输出,运行结果如下所示
其中from_account是transacation_flow的字段,表示转出账号。
JAVA:f.DB:in(String x /*被查值*/, String sql, String jdbcDriver, String url, String user, String password) 是SODBASE系统实现的自定义函数。
一共6个参数,如果x在数据库上运行sql语句返回的数据集中,如果包含x,此函数返回true,否则返回false。
其它数据库示例
mysql:
postgresql:
Microsoft Sql Server:
用户也参考源代码可以进行二次开发,开发出满足自己需求的函数来。
如果要使用输出事件的字段值作为参数,采用?{},如 'select '?{t_id}','from_account','to_account', from black_list where account = ?{from_account} limit 1'
这种方式可以将数据库交互纳入统一的事件建模中。
1. 示例操作
(1)为示例操作简单,下载Oracle Express Edition (11g) Windows版,安装过程中会提示为sys和system用户设置密码,设置为123456安装完毕后从开始菜单运行“Oracle Database 11g Express Edition”->"运行 SQL 命令行"
我们就用Oracle自带的示例表空间users,自带的用户HR
SQL>connect system/123456 SQL>ALTER USER "HR" identified by "123456" SQL>ALTER USER HR ACCOUNT UNLOCK; SQL>disconnect SQL> connect HR/123456 已连接。 SQL> select * from regions; REGION_ID REGION_NAME ---------- -------------------------------------------------- 1 Europe 2 Americas 3 Asia 4 Middle East and Africa
(2)下载最新版的SODBASE Studio(建议2.0.38版本以上版本),解压。
方式一:
下载CEP模型oracleExample.sod,运行SODBASE
Studio,导入oracleExample模型
在画板上右键->查看代码查看此模型的EPL语句
SELECT * FROM T1:randomEventStream PATTERN T1 WHERE JAVA:f.DB:in(T1.value2,'select distinct region_id from regions','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','HR','123456') WITHIN 0
测试运行,结果如下所示,流的value2字段的值只有是在REGION_ID里面才会输出
方式二:
下载CEP模型oracleadaptor01.sod和oracleadaptor01_output.sod,运行SODBASE
Studio,导入这两个模型。依次运行oracleadaptor01_output和oracleadaptor01
oracleadaptor01是定时查询数据库,oracleadaptor01_output打印输出,运行结果如下所示
2.工作原理
2.1 方式一:函数调用方式
可以把数据库查询作为一个函数。比如说一个简单的需求,我们需要在数据流transacation_flow上查出黑名单用户的交易记录。当一笔交易数据请求过来时,我们需要检查交易账号是否在黑名单里,即向数据库发起查询 select account from black_list 然后在返回的结果里做判断。一般来讲这种操作会拉慢CEP引擎的速度,但是如果事件量不大 、tps要求不高的时候,这种方案也是可行的。SELECT * FROM transacation_flow t PATTERN t WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','user','password')
其中from_account是transacation_flow的字段,表示转出账号。
JAVA:f.DB:in(String x /*被查值*/, String sql, String jdbcDriver, String url, String user, String password) 是SODBASE系统实现的自定义函数。
一共6个参数,如果x在数据库上运行sql语句返回的数据集中,如果包含x,此函数返回true,否则返回false。
其它数据库示例
mysql:
SELECT * FROM transacation_flow t PATTERN t WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','com.mysql.jdbc.Driver','jdbc:mysql://localhost:3306/cep?useUnicode=true&characterEncoding=utf8','username','password')
postgresql:
SELECT * FROM transacation_flow t PATTERN t WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''org.postgresql.Driver','jjdbc:postgresql://ip:port/mydb','username','password')
Microsoft Sql Server:
SELECT * FROM transacation_flow t PATTERN t WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''com.microsoft.sqlserver.jdbc.SQLServerDriver','jdbc:sqlserver://localhost:1433;DatabaseName=mydb','username','password')
用户也参考源代码可以进行二次开发,开发出满足自己需求的函数来。
2.2 方式二:事件触发方式
流查询 SELECT * FROM T1:oracleadaptor01 的输出适配器中,触发数据库查询'select * from regions;'右键点击OUTPUT,查看配置如果要使用输出事件的字段值作为参数,采用?{},如 'select '?{t_id}','from_account','to_account', from black_list where account = ?{from_account} limit 1'
这种方式可以将数据库交互纳入统一的事件建模中。
3. 源代码
自定义函数JAVA:f.DB:in在sodbase-cep-udfunction-db.jar中,源代码如下所示,读者也可以修改二次开发自定义函数,修改完毕后打jar包,放到lib目录下面即可/** * */ package f; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; /** * */ public class DB { Connection conn = null; public boolean in(String x, String sql, String jdbcDriver, String url, String user, String password) { try { Connection conn = getConnection(jdbcDriver, url, user, password); conn.setAutoCommit(true); Statement stmt = conn.createStatement(); ResultSet r = stmt.executeQuery(sql ); while(r.next()) { Object o = r.getObject(1); if(o!=null&&o.toString().equals(x)) return true; } r.close(); stmt.close(); } catch (SQLException e) { e.printStackTrace(); } finally { if(conn!=null) try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } conn=null; } return false; } public Connection getConnection(String jdbcDriver, String dburl,String dbusername, String dbpassword) { String dbclass =jdbcDriver; try { Class.forName(dbclass); } catch (ClassNotFoundException e) { e.printStackTrace(); } try { if(conn!=null&&!conn.isClosed()) return conn; } catch (SQLException e1) { e1.printStackTrace(); } try { conn =DriverManager.getConnection(dburl, dbusername, dbpassword); } catch (SQLException e) { e.printStackTrace(); } return conn; } }
相关文章推荐
- SODBASE CEP学习进阶篇(二)续:日志采集之外的APM探针方法
- SODBASE CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-低延迟规则管理
- SODBASE CEP学习(六):流式计算中的存储和ETL
- SODBASE CEP学习(十二):规则模板、阈值和用户界面
- SODBASE CEP学习(十一):分布式集群
- SODBASE CEP学习进阶篇(五):与分布式缓存集成
- SODBASE CEP学习(十一):分布式集群-数据自动分发
- SODBASE CEP学习(五):流式计算中的类SQL语言EPL
- SODBASE CEP学习进阶篇(二):日志采集
- SODBASE CEP学习(四)续:类SQL语言EPL与Storm或jStorm集成-使用分布式缓存
- SODBASE CEP学习(十五):常见场景EPL示例
- SODBASE CEP学习(十三):EPL常用函数
- SODBASE CEP学习(七):Fail retry机制、去重只执行一次机制、事件乱序处理机制
- SODBASE CEP学习(二):运行第一个EPL例子
- SODBASE CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成
- SODBASE CEP学习进阶篇(二)续:日志采集-Flume Syslog采集
- SODBASE CEP学习进阶篇(二)续:日志采集-Logstash、Kafka和CEP集成
- SODBASE CEP学习(九):SODBASE View 实时图表显示和移动端消息
- SODBASE CEP学习进阶篇(三):自己写输入输出适配器
- SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成