SODBASE实时大数据基础(一):实时同步Mysql数据库到Kafka
2016-04-09 23:13
495 查看
在实际大数据工作中,常常有实时监测数据库变化或实时同步数据到大数据存储,解决大数据实时分析的需求。同时,增量同步数据库数据相比全量查询也减少了网络带宽消耗。本文以Mysql的bin-log到Kafka为例,使用Canal Server,通过SODBASE引擎不用写程序就可以设定数据同步规则。
一、搭建Canal Server
1.1. 打开bin-log
(1)编辑mysql配置文件
添加
(2)重启mysql
1.2. 下载 canal.deployer-1.0.21.tar.gz
解压
1.3. 配置修改
应用参数:
说明:
canal.instance.connectionCharset 代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK , ISO-8859-1
4. 准备启动
sh bin/startup.sh
5. 查看日志
2013-02-05 22:45:27.967 [main]INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canalserver.
2013-02-05 22:45:28.113 [main]INFO com.alibaba.otter.canal.deployer.CanalController- ## start the canal server[10.1.29.120:11111]
2013-02-0522:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server isrunning now ......
具体instance的日志:
2013-02-05 22:50:45.636 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main]INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - startCannalInstance for 1-example
2013-02-0522:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring- start successful....
二、建立同步规则模型
2.1. 编辑同步规则
在SODBASE Studio中新建一个模型canaltokafka(此模型也可以下载canaltokafka.sod,在Studio中导入)
配置Input
![](https://img-blog.csdn.net/20160408101750227?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
加一个Filter,作为示例,过滤test数据库的t2表
![](https://img-blog.csdn.net/20160409222507531?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
配置Output输出
![](https://img-blog.csdn.net/20160409230046000?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
T1.* 表示表字段*的新值,例如T1.a 表示表字段a的新值
T1.b_* 表示表字段*修改前的值,例如T1.b_a 表示表字段a的原值
T1.eventtype表示数据操作类型,U为更新,I为插入,D为删除
2.2 运行模型
方法一:
在SODBASE Studio中运行此模型(本示例是在Linux XWindow中运行的)
方法二:
规则模型导出为soddata文件,部署到SODBASE Server
部署方法参考 SODBASE CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-低延迟规则管理
另外,soddata文件可以和XML文件相互转化,即读者可以通过编辑XML来修改同步规则。
三、测试
3.1 安装Kafka,建立1个topic (testbinlog)
找一台linux机器,从官方网站下载Kafka,解压,启动
3.2 mysql修改数据
3.3 Kafka输出
使用kafka-console-consumer.sh消费
可以看到数据库变化都传到Kafka了。此后可以进行数据实时分析或接入大数据存储如HBase等,实现业务数据的实时同步和使用。
![](https://img-blog.csdn.net/20160409230753797?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
SODBASE 实时大树据软件用于轻松、高效实施数据监测、监控类、实时交易类项目
![](http://static.blog.csdn.net/xheditor/xheditor_emot/default/smile.gif)
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。缓存扩展参见与分布式缓存集成。
一、搭建Canal Server
1.1. 打开bin-log
(1)编辑mysql配置文件
vi /etc/my.cnf
添加
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1#配置mysql replaction需要定义,不能和canal的slaveId重复运行mysql客户端
mysql -uroot -p
mysql>show binlog events; +------------------+-----+-------------+-----------+-------------+---------------------------------------+ | Log_name | Pos |Event_type | Server_id | End_log_pos |Info | +------------------+-----+-------------+-----------+-------------+---------------------------------------+ | mysql-bin.000001 | 4 |Format_desc | 1 | 106 | Server ver: 5.1.51-log,Binlog ver: 4 | +------------------+-----+-------------+-----------+-------------+---------------------------------------+ 1 row in set (0.00 sec)
(2)重启mysql
[root@localhost user]# service mysqld stop Stoppingmysqld: [ OK ] [root@localhost user]# service mysqld start Startingmysqld: [ OK ]
1.2. 下载 canal.deployer-1.0.21.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
解压
tar zxvfcanal.deployer-$version.tar.gz
1.3. 配置修改
应用参数:
vi conf/example/instance.properties
################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234 # position info,需要改成自己的数据库信息 canal.instance.master.address =127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name= #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password,需要改成自己的数据库信息 canal.instance.dbUsername =canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset =UTF-8 # table regex canal.instance.filter.regex =.*\\..* #################################################
说明:
canal.instance.connectionCharset 代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK , ISO-8859-1
4. 准备启动
sh bin/startup.sh
5. 查看日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main]INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canalserver.
2013-02-05 22:45:28.113 [main]INFO com.alibaba.otter.canal.deployer.CanalController- ## start the canal server[10.1.29.120:11111]
2013-02-0522:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server isrunning now ......
具体instance的日志:
$ vi logs/example/example.log
2013-02-05 22:50:45.636 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main]INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loadingproperties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main]INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - startCannalInstance for 1-example
2013-02-0522:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring- start successful....
二、建立同步规则模型
2.1. 编辑同步规则
在SODBASE Studio中新建一个模型canaltokafka(此模型也可以下载canaltokafka.sod,在Studio中导入)
配置Input
加一个Filter,作为示例,过滤test数据库的t2表
配置Output输出
T1.* 表示表字段*的新值,例如T1.a 表示表字段a的新值
T1.b_* 表示表字段*修改前的值,例如T1.b_a 表示表字段a的原值
T1.eventtype表示数据操作类型,U为更新,I为插入,D为删除
2.2 运行模型
方法一:
在SODBASE Studio中运行此模型(本示例是在Linux XWindow中运行的)
方法二:
规则模型导出为soddata文件,部署到SODBASE Server
部署方法参考 SODBASE CEP学习进阶篇(七)续:SODBASE CEP与Spark streaming集成-低延迟规则管理
另外,soddata文件可以和XML文件相互转化,即读者可以通过编辑XML来修改同步规则。
三、测试
3.1 安装Kafka,建立1个topic (testbinlog)
找一台linux机器,从官方网站下载Kafka,解压,启动
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testbinlog & bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testbinlog
3.2 mysql修改数据
mysql> use test mysql> create table t2(a int,b int); Query OK, 0 rows affected (0.06 sec) mysql> insert into t2 values(1,2); Query OK, 1 row affected (0.00 sec) mysql> insert into t2 values(1,3); Query OK, 1 row affected (0.00 sec) mysql> update t2 set a = 0 where b = 2; Query OK, 1 row affected (0.08 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> delete from t2 where a = 0; Query OK, 1 row affected (0.07 sec)
3.3 Kafka输出
使用kafka-console-consumer.sh消费
可以看到数据库变化都传到Kafka了。此后可以进行数据实时分析或接入大数据存储如HBase等,实现业务数据的实时同步和使用。
SODBASE 实时大树据软件用于轻松、高效实施数据监测、监控类、实时交易类项目
![](http://static.blog.csdn.net/xheditor/xheditor_emot/default/smile.gif)
。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE
Studio。嵌入式方式编程参见运行第一个EPL例子。缓存扩展参见与分布式缓存集成。
相关文章推荐
- Rails中scope和类方法的区别
- 【转】Condition的await-signal流程详解
- LeetCode *** 219. Contains Duplicate II
- InvalidateRect只是增加重绘区域,在下次WM_PAINT的时候才生效
- InvalidateRect只是增加重绘区域,在下次WM_PAINT的时候才生效
- 阿里云人工智能小Ai是比深度学习更高阶的算法
- http://blog.csdn.net/neiloid/article/details/7037093#
- HDOJ 2057 A + B Again
- kaili aircrack-ng remark
- pair<int, int> set<pair<int, int>>
- gem sources -a https://ruby.taobao.org/ 提示:Error fetching https://ruby.taobao.org/ SSL_connect returned=1 errno=0 state=SSLv3 read server certificate B: ce rtificate verify failed
- 2008 APAC local onsites Problem C. Millionaire
- aix 关闭报警黄灯
- 关于[[NSBundle mainBundle] infoDictionary]里面的那些东西
- NFS SERVER 引起aix系统无法启动
- leetcode196-Delete Duplicate Emails(删除重复并且id较大的数据)
- 史上最全开源大数据工具汇总
- hdu1021 Fibonacci Again
- AIX双网卡绑定
- Conversion to Dalvik format failed with error 1