在Kettle里使用时间戳实现变化数据捕获(CDC)
2016-12-26 14:03
579 查看
1. 建立测试表,插入数据。
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/970ee1594b4115d8c792fa0e571171e1)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/fc2a0743d6970e5e024ec9239e0756fc)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/d58db7004eb1cc54da92cd62d31dcb9e)
说明:
把current_load时间设置成作业的开始时间。通过“获取系统信息”完成这一功能,在这个步骤里创建一个“系统日期(变)”类型的字段,字段名是sysdate。然后创建一个“插入/更新”步骤,把“获取系统信息”步骤和“插入/更新”步骤连接起来。在“插入/更新”步骤的“更新字段”部分里,用流里的字段“sysdate”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分,把表的“current_load”的条件设置为“is not null”即可。
4. 创建查询变化数据的转换
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/963f74bda20a4d3a1aaa1b61d63b5501)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/294654e2c7ae0a349582c0648622f045)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/a41d9603992ec55571241efd4bc3e6cd)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/978dcd18da5cbfa22f1c932248f0f92e)
说明:
从t_color表里抽取数据的查询语句使用开始日期和结束日期,左边闭区间,右边开区间。查询条件类似下面的语句:
(create_date >= last_load and create_date < current_load) or (last_update >= last_load and last_update < current_load)
这里需要两个表输入步骤,一个用来从cdc_time表中抽取时间,另一个从t_color表中抽取需要的数据。另外再看查询条件,可以发现last_load和current_load分别出现两次。就是说在第一个表输入步骤中,这些时间值需要被抽取出来两次。
select
last_load last1,
current_load cur1,
last_load last2,
current_load cur2
from cdc_time;
在t_color表输入步骤里,选中“替换 sql 语句里的变量”,在“从步骤插入数据”下拉列表里选中上个表输入步骤。在select语句里写入下面的查询条件:
where (create_date >= ? and create_date < ?) or (last_update >= ? and last_update < ?)
前一个步骤传来的参数将替换上面语句里的问号,第一个问号的值是last1,第二个问号的值是cur1,等等。
通过比较create_date和last_update的值是否相等,可以判断出是新增的还是更改的数据。
case when create_date = last_update then 'new' else 'changed' end as flagfield
把变更数据输出到文本文件里。
5. 创建更新参数表的转换
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/278d5fc57ae699bc04179e5ee24dba1f)
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/8a0dc158c5794deb51fd87b9708a6ade)
说明:
如果转换中没有发生任何错误,要把current_load字段里的值复制到last_load字段里。如果转换中发生了错误,时间戳需要保持不变。把current_load字段里的值复制到last_load字段里需要“执行sql语句”步骤,脚本如下:
update cdc_time set last_load = current_load;
cdc_time表里之所以要有两个字段,是因为在加载过程中,会有新的数据被插入或更新,为避免脏读或死锁的情况,最好给create和update时间戳设定一个上限条件,也就是这里的current_load字段。
6. 创建作业
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/c346edd975f65578c8a0c0dd713294ba)
7. 测试
-- 运行作业
-- 查看diff文件
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/7bc3136cabadcbe46976e3235935d764)
-- 查看cdc_time表
mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load | current_load |
+---------------------+---------------------+
| 2014-12-16 11:10:05 | 2014-12-16 11:10:05 |
+---------------------+---------------------+
1 row in set (0.00 sec)
-- 修改数据
-- 查看diff文件
![](https://oscdn.geek-share.com/Uploads/Images/Content/202006/05/2a138a9df7e3935e5ba95ff7e6d8f05e)
-- 查看cdc_time表
mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load | current_load |
+---------------------+---------------------+
| 2014-12-16 11:16:02 | 2014-12-16 11:16:02 |
+---------------------+---------------------+
1 row in set (0.00 sec)
8. 总结 基于源数据的CDC要求源数据里有相关的属性列,ETL过程可以利用这些属性列,来判断出哪些数据是增量数据。最常见的属性列有以下两种:
时间戳:这种方法至少需要一个更新时间戳,但最好有两个时间戳:一个插入时间戳,记录数据行什么时候创建;一个更新时间戳,记录数据行什么时候最后一次更新。
序列:大多数数据库都有自增序列。如果数据库表用到了这种序列,就可以很容易识别出新插入的数据。
这两种方法都需要一个额外的数据库表来存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据缓冲区里创建这个参数表,不能在数据仓库里创建,更不能在数据集市里创建。基于时间戳和自增序列的方法是CDC最简单的实现方式,所以也是最常用的方法。但是它的缺点也是很明显的,主要如下:
区分插入操作和更新操作:只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则无法区分。
删除记录的操作:不能捕获到删除操作,除非是逻辑删除,即记录没有真的删除,只是做了逻辑上的标志。
多次更新检测:如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更新操作都丢失了。
实时能力:时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据加载。
use test; create table t_color ( id int unsigned not null auto_increment primary key, color varchar(10), create_date datetime, last_update timestamp ) engine=myisam; insert into t_color (color,create_date) values('black',now()),('green',now()),('red',now()),('blue',now()); select * from t_color;2. 建立参数表存储最后一次的抽取时间。
use test; -- 创建时间戳表 create table cdc_time ( last_load datetime, current_load datetime) engine=myisam; -- 初始化数据 insert into cdc_time values ('1971-01-01 00:00:01','1971-01-01 00:00:01'); select * from cdc_time;3. 创建初始化时间戳转换
说明:
把current_load时间设置成作业的开始时间。通过“获取系统信息”完成这一功能,在这个步骤里创建一个“系统日期(变)”类型的字段,字段名是sysdate。然后创建一个“插入/更新”步骤,把“获取系统信息”步骤和“插入/更新”步骤连接起来。在“插入/更新”步骤的“更新字段”部分里,用流里的字段“sysdate”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分,把表的“current_load”的条件设置为“is not null”即可。
4. 创建查询变化数据的转换
说明:
从t_color表里抽取数据的查询语句使用开始日期和结束日期,左边闭区间,右边开区间。查询条件类似下面的语句:
(create_date >= last_load and create_date < current_load) or (last_update >= last_load and last_update < current_load)
这里需要两个表输入步骤,一个用来从cdc_time表中抽取时间,另一个从t_color表中抽取需要的数据。另外再看查询条件,可以发现last_load和current_load分别出现两次。就是说在第一个表输入步骤中,这些时间值需要被抽取出来两次。
select
last_load last1,
current_load cur1,
last_load last2,
current_load cur2
from cdc_time;
在t_color表输入步骤里,选中“替换 sql 语句里的变量”,在“从步骤插入数据”下拉列表里选中上个表输入步骤。在select语句里写入下面的查询条件:
where (create_date >= ? and create_date < ?) or (last_update >= ? and last_update < ?)
前一个步骤传来的参数将替换上面语句里的问号,第一个问号的值是last1,第二个问号的值是cur1,等等。
通过比较create_date和last_update的值是否相等,可以判断出是新增的还是更改的数据。
case when create_date = last_update then 'new' else 'changed' end as flagfield
把变更数据输出到文本文件里。
5. 创建更新参数表的转换
说明:
如果转换中没有发生任何错误,要把current_load字段里的值复制到last_load字段里。如果转换中发生了错误,时间戳需要保持不变。把current_load字段里的值复制到last_load字段里需要“执行sql语句”步骤,脚本如下:
update cdc_time set last_load = current_load;
cdc_time表里之所以要有两个字段,是因为在加载过程中,会有新的数据被插入或更新,为避免脏读或死锁的情况,最好给create和update时间戳设定一个上限条件,也就是这里的current_load字段。
6. 创建作业
7. 测试
-- 运行作业
-- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load | current_load |
+---------------------+---------------------+
| 2014-12-16 11:10:05 | 2014-12-16 11:10:05 |
+---------------------+---------------------+
1 row in set (0.00 sec)
-- 修改数据
delete from t_color where id=3; update t_color set color='Grey' where id=1; insert into t_color (color,create_date) values('Yellow',now()); commit;-- 运行作业
-- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load | current_load |
+---------------------+---------------------+
| 2014-12-16 11:16:02 | 2014-12-16 11:16:02 |
+---------------------+---------------------+
1 row in set (0.00 sec)
8. 总结 基于源数据的CDC要求源数据里有相关的属性列,ETL过程可以利用这些属性列,来判断出哪些数据是增量数据。最常见的属性列有以下两种:
时间戳:这种方法至少需要一个更新时间戳,但最好有两个时间戳:一个插入时间戳,记录数据行什么时候创建;一个更新时间戳,记录数据行什么时候最后一次更新。
序列:大多数数据库都有自增序列。如果数据库表用到了这种序列,就可以很容易识别出新插入的数据。
这两种方法都需要一个额外的数据库表来存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据缓冲区里创建这个参数表,不能在数据仓库里创建,更不能在数据集市里创建。基于时间戳和自增序列的方法是CDC最简单的实现方式,所以也是最常用的方法。但是它的缺点也是很明显的,主要如下:
区分插入操作和更新操作:只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则无法区分。
删除记录的操作:不能捕获到删除操作,除非是逻辑删除,即记录没有真的删除,只是做了逻辑上的标志。
多次更新检测:如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更新操作都丢失了。
实时能力:时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据加载。
相关文章推荐
- [angular]服务之3$injector
- 非常简单的小爬虫。
- 信鸽推送.NET SDK 开源
- es6模块加载 nodejs(common.js)模块加载 amd模块加载的区别整理
- ssh localhost :connect refused
- 获取网络连接状态
- 软件开发流程包括测试环境和生产环境
- fir.im Weekly - iOS / Android 动态化更新方案盘点
- 关于Banner设计的促销氛围
- leetcode oj java Find Minimum in Rotated Sorted Array
- hadoop2.7.3集群搭建------>真正的分布式环境
- 点评五款用于 Linux 编程的内存调试器
- JdbcTemplate and TransactionTemplate
- oracle 向表中插入BLOB类型数据
- C语言函数如何访问http
- RequestDemo01
- 使用外部配置文件
- 苹果手机按钮样式去除
- Android开发之调用摄像头拍照
- MVC框架的封装(六)视图类