MySQL Binlog以及Canal简单使用

在ETL过程中,如何获取增量数据是完成ETL过程的主要解决问题,前面文章ETL增量数据的捕获中,使用日志对比的方法是一种比较好的获取增量数据流的方式,那篇文章里主要介绍了SQL Server的Change Data Capture这么个功能来完成增量数据的获取任务,这篇文章介绍一下对于MySQL来说,如何通过读取binlog来获取增量数据。

MySQL的日志

MySQL 的日志包括错误日志(ErrorLog),更新日志(Update Log),二进制日志(Binlog),查询日志(Query Log),慢查询日志(Slow Query Log)等。在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志。

开启Binlog

MySQL 默认没有开启 Binlog,下面要开启Binlog。mac上使用brew安装的mysql,默认安装后的目录是/usr/local/Cellar,在这个目录中没有MySQL的配置文件my.cnf,在系统中查看my.cnf的路径

1
mysql --help --verbose | grep my.cnf

可以看到MySQL是按照这个顺序来读取配置文件的,下面在目录/etc/下新建文件my.cnf`,并添加如下来开启Binlog

1
2
3
4
5
[mysqld]
#log_bin
log-bin = mysql-bin #开启binlog
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复

重启MySQL服务

1
mysql.server restart

查看Binlog是否开启

1
show variables like "%log_bin%";

简单查看Binlog内容

首先创建一个表test,然后向表中插入两条数据

1
2
3
create table test(id int, name varchar(20)) engine=innodb charset=utf8;
insert into test(id, name) values(1, "aaa");
insert into test(id, name) values(2, "bbb");

显示当前使用的Binlog及所处位置,然后查看这个二进制文件内容

1
2
show master status;
show binlog events in "mysql-bin.000002";

使用MySQL内置的mysqlbinlog工具查看和操作Binlog

1
$ mysqlbinlog -v mysql-bin.000002

可以看到刚才创建表和插入数据的操作都被记录下来了。关于Binlog更详细内容,参考MySQL官方文档The Binary Log

使用Canal监听MySQL增量数据

Canal是阿里开源的MySQL数据库Binlog的增量订阅&消费组件

MySQL的Binlog有两个重要用途:主从复制和数据恢复,关于主从复制的内容,参考MySQL主备复制原理、实现及异常处理

这里其实利用了MySQL支持主从复制的原理,Master产生Binlog记录增删改语句,通过将Binlog发送到Slave节点从而完成Binlog的解析。下图是原理图

canal工作原理

Canal是CS结构

安装配置Canal服务端

下载canal releases包,解压之后,编辑conf/example文件夹下的instance.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#################################################
## mysql serverId
canal.instance.mysql.slaveId=1234
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=

# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal


#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=tutorials
canal.instance.connectionCharset=UTF-8
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#################################################

address设置为mysql的连接地址,defaultDatabaseName设置为自己要监听的库名,这里是tutorial

在MySQL命令行,创建一个新用户,作为slave,这个用户对应配置文件里的dbUsername

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal!QAZ2wsx';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

启动

1
sh bin/startup.sh

启动后可以在logs目录下查看日志

编写Canal客户端程序

新建一个java maven项目,pom.xml里添加依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.25</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
* @author lvshuo
* @create 2018/5/17
* @since 1.0.0
*/
public class MainApp {

public static void main(String[] args) throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries ());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}

private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
.EntryType
.TRANSACTIONEND) {
continue;
}

CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

}

模拟MySQL的变化

创建一张新表,插入一条数据,然后更新这条数据,最后删除这条数据,查看一下canal客户端是不是抓取到了增量数据:

1
2
3
create table test_canal(id int, name varchar(20)) engine=innodb charset=utf8;
insert into test_canal(id, name) values(1, "aaa");
update test_canal set name="hahaha" where id=1;

可以看到,创建了一个表,然后插入一条新数据,然后更新了这条数据,更新前后的值都可以看到。

坚持原创技术分享,您的支持将鼓励我继续创作!