Canal+Otter - Canal篇(1)
2017-12-13 00:00
351 查看
Canal是阿里开源产品之一,是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal主要支持了MySQL的binlog解析。
为何要解析binlog: binlog中含有许多我们需要的信息,基于这些信息,我们可以实现很多功能:
异构数据库同步
数据库事件触发实现分布式事务
数据检效与监控
等等
MySQL主从同步原理:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
Canal模拟binlog用的传输协议,把自己伪装成slave,抓取日志:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
修复mysql协议读取 #127 [BUG]
mysql 5.6版本 datetime值为null时 sqltype解析异常 #130 [BUG]
值由Null变为空字符串时,isUpdated属性为false #135 [BUG]
多表rename ddl解析出现NPE #122 #128 #137 [BUG]
这几个bug比较重要,所以最好用最新版的canal。之后的otter最新版的默认内置canal版本为1.0.20,最好在这里自己编译下并替换。
配置测试数据库,开启binlog:
添加Canal用户:
在编译好的目录下的target中找到canal.deployer-1.0.21-SNAPSHOT.tar.gz,解压
配置conf/example/instance.properties:
订阅起始点可自定义,查看当前binlog状态:
一般的,binlog通过文件名和position就可以定位到,timestamp一般可以不用填。
配置conf/canal.properties:
剩下的配置我在Canal源代码分析中会细讲,敬请期待。
配置好,启动:
查看日志,启动成功。
之后利用客户端程序测试:
测试结果:
可以看出,基于ROW格式的binlog解析,我们可以解析出是何种语句,以及每条记录是怎么更新的。
为何要解析binlog: binlog中含有许多我们需要的信息,基于这些信息,我们可以实现很多功能:
异构数据库同步
数据库事件触发实现分布式事务
数据检效与监控
等等
基本原理:
MySQL主从同步原理:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
Canal模拟binlog用的传输协议,把自己伪装成slave,抓取日志:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
快速使用:
目前最新的版本是Canal1.0.21,在这个版本修复了几个bug修复mysql协议读取 #127 [BUG]
mysql 5.6版本 datetime值为null时 sqltype解析异常 #130 [BUG]
值由Null变为空字符串时,isUpdated属性为false #135 [BUG]
多表rename ddl解析出现NPE #122 #128 #137 [BUG]
这几个bug比较重要,所以最好用最新版的canal。之后的otter最新版的默认内置canal版本为1.0.20,最好在这里自己编译下并替换。
git clone https://github.com/alibaba/canal.git mvn clean install -Dmaven.test.skip -Denv=release
配置测试数据库,开启binlog:
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式,虽然Canal支持各种模式,但是想用otter,必须用ROW模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
添加Canal用户:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
在编译好的目录下的target中找到canal.deployer-1.0.21-SNAPSHOT.tar.gz,解压
mkdir /usr/local/canal tar zxvf canal.deployer-1.0.21-SNAPSHOT.tar.gz -C /usr/local/canal
配置conf/example/instance.properties:
################################################# ## mysql serverId ## 这个id不能和目标源数据库的id一样 canal.instance.mysql.slaveId = 1234 # 数据库地址,binlog订阅开始点 canal.instance.master.address = 10.2 7ff0 02.4.39:3308 canal.instance.master.journal.name = mysql-binlog.000005 canal.instance.master.position = 126596922 canal.instance.master.timestamp = # 配置备用源数据库 #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # 订阅哪些表的binlog,支持正则表达式 canal.instance.filter.regex = .*\\..* # 过滤掉的表的正则表达式 canal.instance.filter.black.regex = #################################################
订阅起始点可自定义,查看当前binlog状态:
show master status;
一般的,binlog通过文件名和position就可以定位到,timestamp一般可以不用填。
配置conf/canal.properties:
################################################# ######### common argument ############# ################################################# canal.id= 1000001 canal.ip= 10.202.44.205 canal.port= 20999 # canal通过zk做负载均衡 canal.zkServers= 127.0.0.1:2181 # flush data to zk canal.zookeeper.flush.period = 1000 # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false ################################################# ######### destinations ############# ################################################# canal.destinations= example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.global.mode = spring canal.instance.global.lazy = false #canal.instance.global.manager.address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
剩下的配置我在Canal源代码分析中会细讲,敬请期待。
配置好,启动:
./bin/startup.sh
查看日志,启动成功。
之后利用客户端程序测试:
import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; /** * Created by 862911 on 2016/3/8. */ public class CanalClientUtil { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.202.44.205", 20999), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmtryCount = 120; while (emptyCount < totalEmtryCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
测试结果:
================> binlog[mysql-binlog.000005:126600892] , name[express2,exp_data_waybill] , eventType : INSERT waybill_no : 444502894553 update=true delivery_tel : jHR4SS2qWgxxAntO1y1HVA== update=true delivery_mobile : sKBkAB0Z3LGVCbVfS1YOZQ== update=true delivery_contact : FdVG8RGFYwtWL9MU6QoMxg== update=true delivery_addr : CW4QuCFWXlDZdTQCGgJLPe+LiC3hKPV7ykvdB7qx7dE= update=true delivery_company : - update=true delivery_addr_lat : 0.0 update=true delivery_addr_lng : 0.0 update=true consignee_tel : kcObfcMJkQ+uAh2RtAZ6cQ== update=true consignee_mobile : b0mwnXp6/YKX/MKXX6S8CQ== update=true consignee_contact : b0mwnXp6/YKX/MKXX6S8CQ== update=true consignee_addr : XESDfeSycHu4VHDE/ns1QksFDEmfVhkUgGWZ/+ea+tpU4Dq+d1/Rez4RGvRdALOS update=true consignee_company : sf-express update=true source_zone_code : update=true dest_zone_code : 010 update=true meterage_weight_qty : 20.0 update=true real_weight_qty : 1.0 update=true quantity : 1.0 update=true consignee_emp_code : 000212 update=true consigned_tm : 2016-03-04 17:25:42 update=true deliver_emp_code : update=true subscriber_name : update=true signin_tm : 0000-00-00 00:00:00 update=true cargo_type_code : C201 update=true limit_type_code : T4 update=true distance_type_code : R10102 update=true transport_type_code : TR2 update=true express_type_code : B1 update=true bill_long : update=true bill_width : update=true bill_high : update=true volume : 120000.0 update=true last_modified_tm : 2016-03-08 11:15:24 update=true is_child_waybill : N update=true is_deleted : 0 update=true created_time : 2016-03-08 11:15:27 update=true inputer_emp_code : BSP update=true modified_time : 2016-03-08 11:15:27 update=true ================> binlog[mysql-binlog.000005:126601605] , name[express2,exp_data_waybill] , eventType : UPDATE -------> before waybill_no : 906501983434 update=false delivery_tel : 9BxLJQjsg8u0y5T4Prf0Hg== update=false delivery_mobile : 9BxLJQjsg8u0y5T4Prf0Hg== update=false delivery_contact : +nSsYUguIjG7al33EaPDzA== update=false delivery_addr : AfdjSlmWTbKQgeqVaQgDvw== update=false delivery_company : 啊啊啊啊啊啊 update=false delivery_addr_lat : 0.0 update=false delivery_addr_lng : 0.0 update=false consignee_tel : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_mobile : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_contact : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_addr : qoPgg0MX3wMoT1g9JpRQFA== update=false consignee_company : soreufgd update=false source_zone_code : update=false dest_zone_code : 010A update=false meterage_weight_qty : 100.0 update=false real_weight_qty : 100.0 update=false quantity : 1.0 update=false consignee_emp_code : 002776 update=false consigned_tm : 2016-03-01 10:45:00 update=false deliver_emp_code : update=false subscriber_name : update=false signin_tm : 0000-00-00 00:00:00 update=false cargo_type_code : C201 update=false limit_type_code : T4 update=false distance_type_code : R10102 update=false transport_type_code : TR2 update=false express_type_code : B1 update=false bill_long : update=false bill_width : update=false bill_high : update=false volume : update=false last_modified_tm : 2016-03-01 14:22:09 update=false is_child_waybill : N update=false is_deleted : 0 update=false created_time : 2016-03-04 19:59:20 update=false inputer_emp_code : 000000 update=false modified_time : 2016-03-04 19:59:20 update=false -------> after waybill_no : 906501983434 update=false delivery_tel : 9BxLJQjsg8u0y5T4Prf0Hg== update=false delivery_mobile : 9BxLJQjsg8u0y5T4Prf0Hg== update=false delivery_contact : +nSsYUguIjG7al33EaPDzA== update=false delivery_addr : AfdjSlmWTbKQgeqVaQgDvw== update=false delivery_company : 啊啊啊啊啊啊 update=false delivery_addr_lat : 0.0 update=false delivery_addr_lng : 0.0 update=false consignee_tel : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_mobile : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_contact : 9BxLJQjsg8u0y5T4Prf0Hg== update=false consignee_addr : qoPgg0MX3wMoT1g9JpRQFA== update=false consignee_company : soreufgd update=false source_zone_code : update=false dest_zone_code : 010A update=false meterage_weight_qty : 100.0 update=false real_weight_qty : 100.0 update=false quantity : 1.0 update=false consignee_emp_code : 002776 update=false consigned_tm : 2016-03-01 10:45:00 update=false deliver_emp_code : update=false subscriber_name : update=false signin_tm : 0000-00-00 00:00:00 update=false cargo_type_code : C201 update=false limit_type_code : T4 update=false distance_type_code : R10102 update=false transport_type_code : TR2 update=false express_type_code : B1 update=false bill_long : update=false bill_width : update=false bill_high : update=false volume : update=false last_modified_tm : 2016-03-08 11:18:25 update=true is_child_waybill : N update=false is_deleted : 0 update=false created_time : 2016-03-04 19:59:20 update=false inputer_emp_code : 000000 update=false modified_time : 2016-03-08 11:18:27 update=true
可以看出,基于ROW格式的binlog解析,我们可以解析出是何种语句,以及每条记录是怎么更新的。
相关文章推荐
- Canal+Otter - Canal篇(1)
- Canal+Otter - Canal篇(1)
- canal和otter的关系?
- Canal+Otter - 前日篇(2)
- Canal+Otter - 前日篇(2)
- otter学习(5)— canal和otter的关系?
- Canal+Otter - 前日篇(1)
- Canal+Otter - 前日篇(2)
- Canal & Otter 的一些注意事项和最佳实践
- 【源码】canal和otter的高可靠性分析
- java.lang.AbstractMethodError: com.alibaba.otter.canal.common.zookeeper.ZooKeeperx.writeDat aReturnS
- 【源码】canal和otter的高可靠性分析
- 2、mysql数据库双机同步开源软件otter的安装 二
- otter(三)--同步过程小解
- 测试canal的数据堆积能力,如果canal可以堆积数据,那么就不需要消息队列来堆积。
- 谈谈对Canal(增量数据订阅与消费)的理解
- Canal-Server的HA功能验证
- canal简介
- centos7 下安装canal,并实现将mysql数据同步到redis
- 缓存一致性和跨服务器查询的数据异构解决方案canal