您的位置:首页 > 数据库

SODBASE CEP学习(十六):CEP与数据库交互

2016-11-19 22:50 573 查看
一些时候出于项目需求或复用,需要将CEP和数据库结合起来用。SODBASE CEP可以很好地支持这类型需求。本文将介绍CEP与数据库交互的两种常用方式。

1. 示例操作

(1)为示例操作简单,下载Oracle Express Edition (11g) Windows版,安装过程中会提示为sys和system用户设置密码,设置为123456

安装完毕后从开始菜单运行“Oracle Database 11g Express Edition”->"运行 SQL 命令行"

我们就用Oracle自带的示例表空间users,自带的用户HR
SQL>connect system/123456
SQL>ALTER USER "HR" identified by "123456"
SQL>ALTER USER HR ACCOUNT UNLOCK;
SQL>disconnect
SQL> connect HR/123456
已连接。
SQL> select * from regions;
REGION_ID REGION_NAME
---------- --------------------------------------------------
1 Europe
2 Americas
3 Asia
4 Middle East and Africa


(2)下载最新版的SODBASE Studio(建议2.0.38版本以上版本),解压

方式一:

下载CEP模型oracleExample.sod,运行SODBASE
Studio,导入oracleExample模型

在画板上右键->查看代码查看此模型的EPL语句
SELECT * FROM T1:randomEventStream
PATTERN T1
WHERE JAVA:f.DB:in(T1.value2,'select distinct region_id from regions','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','HR','123456')   WITHIN 0


测试运行,结果如下所示,流的value2字段的值只有是在REGION_ID里面才会输出



方式二:

下载CEP模型oracleadaptor01.sod和oracleadaptor01_output.sod,运行SODBASE
Studio,导入这两个模型。依次运行oracleadaptor01_output和oracleadaptor01

oracleadaptor01是定时查询数据库,oracleadaptor01_output打印输出,运行结果如下所示



2.工作原理

2.1 方式一:函数调用方式

可以把数据库查询作为一个函数。比如说一个简单的需求,我们需要在数据流transacation_flow上查出黑名单用户的交易记录。当一笔交易数据请求过来时,我们需要检查交易账号是否在黑名单里,即向数据库发起查询 select account from black_list 然后在返回的结果里做判断。一般来讲这种操作会拉慢CEP引擎的速度,但是如果事件量不大 、tps要求不高的时候,这种方案也是可行的。

SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','user','password')

其中from_account是transacation_flow的字段,表示转出账号。

JAVA:f.DB:in(String x /*被查值*/, String sql, String jdbcDriver, String url, String user, String password) 是SODBASE系统实现的自定义函数。

一共6个参数,如果x在数据库上运行sql语句返回的数据集中,如果包含x,此函数返回true,否则返回false。

其它数据库示例

mysql:

SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','com.mysql.jdbc.Driver','jdbc:mysql://localhost:3306/cep?useUnicode=true&characterEncoding=utf8','username','password')


postgresql:
SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''org.postgresql.Driver','jjdbc:postgresql://ip:port/mydb','username','password')


Microsoft Sql Server:

SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''com.microsoft.sqlserver.jdbc.SQLServerDriver','jdbc:sqlserver://localhost:1433;DatabaseName=mydb','username','password')


用户也参考源代码可以进行二次开发,开发出满足自己需求的函数来。

2.2 方式二:事件触发方式

流查询 SELECT * FROM T1:oracleadaptor01 的输出适配器中,触发数据库查询'select * from regions;'右键点击OUTPUT,查看配置



如果要使用输出事件的字段值作为参数,采用?{},如 'select '?{t_id}','from_account','to_account', from black_list where account = ?{from_account} limit 1'

这种方式可以将数据库交互纳入统一的事件建模中。

3. 源代码

自定义函数JAVA:f.DB:in在sodbase-cep-udfunction-db.jar中,源代码如下所示,读者也可以修改二次开发自定义函数,修改完毕后打jar包,放到lib目录下面即可
/**
*
*/
package f;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
*
*/
public class DB
{
Connection conn = null;

public boolean in(String x, String sql, String jdbcDriver, String url, String user, String password)
{
try
{
Connection conn = getConnection(jdbcDriver,  url,  user,  password);
conn.setAutoCommit(true);
Statement stmt = conn.createStatement();

ResultSet r = stmt.executeQuery(sql );
while(r.next())
{
Object o = r.getObject(1);
if(o!=null&&o.toString().equals(x))
return true;
}
r.close();
stmt.close();
} catch (SQLException e)
{
e.printStackTrace();
}
finally
{
if(conn!=null)
try
{
conn.close();
} catch (SQLException e)
{
e.printStackTrace();
}
conn=null;
}
return false;
}

public Connection getConnection(String jdbcDriver, String dburl,String dbusername,
String dbpassword)
{

String dbclass =jdbcDriver;
try {
Class.forName(dbclass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}

try {
if(conn!=null&&!conn.isClosed())
return conn;
} catch (SQLException e1) {
e1.printStackTrace();
}

try {

conn =DriverManager.getConnection(dburl, dbusername,
dbpassword);

} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息